This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 25534ad [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result 25534ad is described below commit 25534adc2311c0198b0e115d12183e6ea90b4449 Author: Rui Li <li...@apache.org> AuthorDate: Tue Aug 13 20:57:18 2019 +0800 [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result Fix the issue that types with parameters, e.g. decimal, cannot be accessed via SQL client. This closes #9432. --- .../gateway/local/CollectBatchTableSink.java | 28 +++++--------- .../gateway/local/CollectStreamTableSink.java | 24 ++++-------- .../table/client/gateway/local/ResultStore.java | 5 ++- .../local/result/ChangelogCollectStreamResult.java | 5 ++- .../gateway/local/result/CollectStreamResult.java | 6 +-- .../result/MaterializedCollectBatchResult.java | 6 +-- .../result/MaterializedCollectStreamResult.java | 6 ++- .../table/client/gateway/local/DependencyTest.java | 13 +++++++ .../client/gateway/local/LocalExecutorITCase.java | 43 ++++++++++++++++++++-- .../MaterializedCollectStreamResultTest.java | 13 +++++++ .../src/test/resources/test-data-1.csv | 18 +++++++++ .../test/resources/test-sql-client-catalogs.yaml | 21 +++++++++++ 12 files changed, 139 insertions(+), 49 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java index 9dccfb7..1e3ca21 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java @@ -20,12 +20,13 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sinks.BatchTableSink; import org.apache.flink.table.sinks.OutputFormatTableSink; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; /** @@ -35,13 +36,12 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements private final String accumulatorName; private final TypeSerializer<Row> serializer; + private final TableSchema tableSchema; - private String[] fieldNames; - private TypeInformation<?>[] fieldTypes; - - public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer) { + public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer, TableSchema tableSchema) { this.accumulatorName = accumulatorName; this.serializer = serializer; + this.tableSchema = tableSchema; } /** @@ -52,26 +52,18 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements } @Override - public TypeInformation<Row> getOutputType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public String[] getFieldNames() { - return fieldNames; + public DataType getConsumedDataType() { + return getTableSchema().toRowDataType(); } @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; + public TableSchema getTableSchema() { + return tableSchema; } @Override public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - final CollectBatchTableSink copy = new CollectBatchTableSink(accumulatorName, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; + return new CollectBatchTableSink(accumulatorName, serializer, tableSchema); } @Override diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java index ce8565a..63adb07 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.types.Row; @@ -39,37 +40,28 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> { private final InetAddress targetAddress; private final int targetPort; private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + private final TableSchema tableSchema; - private String[] fieldNames; - private TypeInformation<?>[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer) { + public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer, TableSchema tableSchema) { this.targetAddress = targetAddress; this.targetPort = targetPort; this.serializer = serializer; + this.tableSchema = tableSchema; } @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; + public TableSchema getTableSchema() { + return tableSchema; } @Override public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - final CollectStreamTableSink copy = new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; + return new CollectStreamTableSink(targetAddress, targetPort, serializer, tableSchema); } @Override public TypeInformation<Row> getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); + return getTableSchema().toRowType(); } @Override diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java index c3cc12b..27a5278 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java @@ -67,10 +67,11 @@ public class ResultStore { final int gatewayPort = getGatewayPort(env.getDeployment()); if (env.getExecution().isChangelogMode()) { - return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort); + return new ChangelogCollectStreamResult<>(outputType, schema, config, gatewayAddress, gatewayPort); } else { return new MaterializedCollectStreamResult<>( outputType, + schema, config, gatewayAddress, gatewayPort, @@ -82,7 +83,7 @@ public class ResultStore { if (!env.getExecution().isTableMode()) { throw new SqlExecutionException("Results of batch queries can only be served in table mode."); } - return new MaterializedCollectBatchResult<>(outputType, config); + return new MaterializedCollectBatchResult<>(schema, outputType, config); } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java index 38e9126..43f2de5 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java @@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local.result; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; @@ -38,9 +39,9 @@ public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> impl private List<Tuple2<Boolean, Row>> changeRecordBuffer; private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000; - public ChangelogCollectStreamResult(RowTypeInfo outputType, ExecutionConfig config, + public ChangelogCollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) { - super(outputType, config, gatewayAddress, gatewayPort); + super(outputType, tableSchema, config, gatewayAddress, gatewayPort); // prepare for changelog changeRecordBuffer = new ArrayList<>(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java index 6cdebca..fe74cb9 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.experimental.SocketStreamIterator; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; @@ -55,7 +56,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D protected final Object resultLock; protected SqlExecutionException executionException; - public CollectStreamResult(RowTypeInfo outputType, ExecutionConfig config, + public CollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) { this.outputType = outputType; @@ -73,8 +74,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D // create table sink // pass binding address and port such that sink knows where to send to - collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer) - .configure(outputType.getFieldNames(), outputType.getFieldTypes()); + collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer, tableSchema); retrievalThread = new ResultRetrievalThread(); monitoringThread = new JobMonitoringThread(); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java index dc482d0..2fe61b8 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.client.gateway.local.CollectBatchTableSink; @@ -54,12 +55,11 @@ public class MaterializedCollectBatchResult<C> extends BasicResult<C> implements private volatile boolean snapshotted = false; - public MaterializedCollectBatchResult(RowTypeInfo outputType, ExecutionConfig config) { + public MaterializedCollectBatchResult(TableSchema tableSchema, RowTypeInfo outputType, ExecutionConfig config) { this.outputType = outputType; accumulatorName = new AbstractID().toString(); - tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config)) - .configure(outputType.getFieldNames(), outputType.getFieldTypes()); + tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config), tableSchema); resultLock = new Object(); retrievalThread = new ResultRetrievalThread(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java index 2becfda..7398758 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; @@ -91,12 +92,13 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i @VisibleForTesting public MaterializedCollectStreamResult( RowTypeInfo outputType, + TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, int maxRowCount, int overcommitThreshold) { - super(outputType, config, gatewayAddress, gatewayPort); + super(outputType, tableSchema, config, gatewayAddress, gatewayPort); if (maxRowCount <= 0) { this.maxRowCount = Integer.MAX_VALUE; @@ -118,6 +120,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i public MaterializedCollectStreamResult( RowTypeInfo outputType, + TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, @@ -125,6 +128,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i this( outputType, + tableSchema, config, gatewayAddress, gatewayPort, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 5fabe83..aad2da1 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectPath; @@ -44,6 +45,7 @@ import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase; import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.types.DataType; import org.junit.Test; @@ -172,6 +174,7 @@ public class DependencyTest { public static class TestHiveCatalogFactory extends HiveCatalogFactory { public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database"; public static final String TEST_TABLE = "test_table"; + static final String TABLE_WITH_PARAMETERIZED_TYPES = "para_types_table"; @Override public Map<String, String> requiredContext() { @@ -213,11 +216,21 @@ public class DependencyTest { ), false ); + // create a table to test parameterized types + hiveCatalog.createTable(new ObjectPath("default", TABLE_WITH_PARAMETERIZED_TYPES), + tableWithParameterizedTypes(), + false); } catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) { throw new CatalogException(e); } return hiveCatalog; } + + private CatalogTable tableWithParameterizedTypes() { + TableSchema tableSchema = TableSchema.builder().fields(new String[]{"dec", "ch", "vch"}, + new DataType[]{DataTypes.DECIMAL(10, 10), DataTypes.CHAR(5), DataTypes.VARCHAR(15)}).build(); + return new CatalogTableImpl(tableSchema, Collections.emptyMap(), ""); + } } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index cbae581..c102acf 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -49,6 +49,7 @@ import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.apache.flink.util.TestLogger; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -479,11 +480,14 @@ public class LocalExecutorITCase extends TestLogger { @Test public void testUseCatalogAndUseDatabase() throws Exception { final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString(); - final URL url = getClass().getClassLoader().getResource("test-data.csv"); - Objects.requireNonNull(url); + final URL url1 = getClass().getClassLoader().getResource("test-data.csv"); + final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv"); + Objects.requireNonNull(url1); + Objects.requireNonNull(url2); final Map<String, String> replaceVars = new HashMap<>(); replaceVars.put("$VAR_PLANNER", planner); - replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath()); + replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath()); replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath); replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); @@ -501,7 +505,8 @@ public class LocalExecutorITCase extends TestLogger { Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB), executor.listDatabases(session)); - assertEquals(Collections.emptyList(), executor.listTables(session)); + assertEquals(Collections.singletonList(DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES), + executor.listTables(session)); executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE); @@ -529,6 +534,36 @@ public class LocalExecutorITCase extends TestLogger { executor.useCatalog(session, "nonexistingcatalog"); } + @Test + public void testParameterizedTypes() throws Exception { + // only blink planner supports parameterized types + Assume.assumeTrue(planner.equals("blink")); + final URL url1 = getClass().getClassLoader().getResource("test-data.csv"); + final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv"); + Objects.requireNonNull(url1); + Objects.requireNonNull(url2); + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", planner); + replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath()); + replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "batch"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + replaceVars.put("$VAR_RESULT_MODE", "table"); + + final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars); + final SessionContext session = new SessionContext("test-session", new Environment()); + executor.useCatalog(session, "hivecatalog"); + String resultID = executor.executeQuery(session, + "select * from " + DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES).getResultId(); + retrieveTableResult(executor, session, resultID); + + // make sure legacy types still work + executor.useCatalog(session, "default_catalog"); + resultID = executor.executeQuery(session, "select * from TableNumber3").getResultId(); + retrieveTableResult(executor, session, resultID); + } + private void executeStreamQueryTable( Map<String, String> replaceVars, String query, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java index c7636cd..cf85011 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java @@ -22,7 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.junit.Test; @@ -43,11 +46,14 @@ public class MaterializedCollectStreamResultTest { @Test public void testSnapshot() throws UnknownHostException { final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG); + TableSchema tableSchema = TableSchema.builder().fields( + new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build(); TestMaterializedCollectStreamResult<?> result = null; try { result = new TestMaterializedCollectStreamResult<>( type, + tableSchema, new ExecutionConfig(), InetAddress.getLocalHost(), 0, @@ -91,11 +97,14 @@ public class MaterializedCollectStreamResultTest { @Test public void testLimitedSnapshot() throws UnknownHostException { final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG); + TableSchema tableSchema = TableSchema.builder().fields( + new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build(); TestMaterializedCollectStreamResult<?> result = null; try { result = new TestMaterializedCollectStreamResult<>( type, + tableSchema, new ExecutionConfig(), InetAddress.getLocalHost(), 0, @@ -146,6 +155,7 @@ public class MaterializedCollectStreamResultTest { public TestMaterializedCollectStreamResult( RowTypeInfo outputType, + TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, @@ -154,6 +164,7 @@ public class MaterializedCollectStreamResultTest { super( outputType, + tableSchema, config, gatewayAddress, gatewayPort, @@ -163,6 +174,7 @@ public class MaterializedCollectStreamResultTest { public TestMaterializedCollectStreamResult( RowTypeInfo outputType, + TableSchema tableSchema, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, @@ -170,6 +182,7 @@ public class MaterializedCollectStreamResultTest { super( outputType, + tableSchema, config, gatewayAddress, gatewayPort, diff --git a/flink-table/flink-sql-client/src/test/resources/test-data-1.csv b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv new file mode 100644 index 0000000..1794cb7 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv @@ -0,0 +1,18 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +123.123,abcd \ No newline at end of file diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml index d4b8010..ff2ccea 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml @@ -71,6 +71,26 @@ tables: - name: TestView2 type: view query: SELECT * FROM default_catalog.default_database.TestView1 + - name: TableNumber3 + type: source-table + $VAR_UPDATE_MODE + schema: + - name: DecimalField + type: DECIMAL + - name: StringField + type: VARCHAR + connector: + type: filesystem + path: "$VAR_SOURCE_PATH2" + format: + type: csv + fields: + - name: DecimalField + type: DECIMAL + - name: StringField + type: VARCHAR + line-delimiter: "\n" + comment-prefix: "#" functions: - name: scalarUDF @@ -98,6 +118,7 @@ functions: value: 5 execution: + planner: "$VAR_PLANNER" type: "$VAR_EXECUTION_TYPE" time-characteristic: event-time periodic-watermarks-interval: 99