This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 35e2fca  [FLINK-18237][fs-connector] Exception when reading filesystem 
partitioned table with stream mode
35e2fca is described below

commit 35e2fca5c9b59a87a6f1a17628daf75d3de575b0
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 11 19:41:36 2020 +0800

    [FLINK-18237][fs-connector] Exception when reading filesystem partitioned 
table with stream mode
    
    This closes #12576
---
 .../formats/csv/CsvFilesystemStreamITCase.java     |  9 +++----
 ...ase.java => CsvFilesystemStreamSinkITCase.java} |  4 +--
 .../table/filesystem/FileSystemTableSource.java    | 31 +++++++++++++++++++---
 3 files changed, 33 insertions(+), 11 deletions(-)

diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
index 976e70e..6e5d2e1 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.formats.csv;
 
-import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+import 
org.apache.flink.table.planner.runtime.stream.sql.StreamFileSystemITCaseBase;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,13 +26,12 @@ import java.util.List;
 /**
  * ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream 
mode.
  */
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase {
+
        @Override
-       public String[] additionalProperties() {
+       public String[] formatProperties() {
                List<String> ret = new ArrayList<>();
                ret.add("'format'='csv'");
-               // for test purpose
-               ret.add("'sink.rolling-policy.file-size'='1b'");
                return ret.toArray(new String[0]);
        }
 }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
similarity index 92%
copy from 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
copy to 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
index 976e70e..8b694dc 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream 
mode.
+ * ITCase to test csv format for {@link CsvFileSystemFormatFactory} for 
streaming sink.
  */
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamSinkITCase extends FsStreamingSinkITCaseBase {
        @Override
        public String[] additionalProperties() {
                List<String> ret = new ArrayList<>();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index 35ba899..d834907 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -19,24 +19,31 @@
 package org.apache.flink.table.filesystem;
 
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
 import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.PartitionableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.table.utils.TableConnectorUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,7 +59,8 @@ import static 
org.apache.flink.table.filesystem.FileSystemTableFactory.createFor
 /**
  * File system table source.
  */
-public class FileSystemTableSource extends InputFormatTableSource<RowData> 
implements
+public class FileSystemTableSource implements
+               StreamTableSource<RowData>,
                PartitionableTableSource,
                ProjectableTableSource<RowData>,
                LimitableTableSource<RowData>,
@@ -111,7 +119,22 @@ public class FileSystemTableSource extends 
InputFormatTableSource<RowData> imple
        }
 
        @Override
-       public InputFormat<RowData, ?> getInputFormat() {
+       public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
execEnv) {
+               @SuppressWarnings("unchecked")
+               TypeInformation<RowData> typeInfo =
+                               (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+               // Avoid using ContinuousFileMonitoringFunction
+               InputFormatSourceFunction<RowData> func = new 
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
+               DataStreamSource<RowData> source = execEnv.addSource(func, 
explainSource(), typeInfo);
+               return source.name(explainSource());
+       }
+
+       @Override
+       public boolean isBounded() {
+               return true;
+       }
+
+       private InputFormat<RowData, ?> getInputFormat() {
                // When this table has no partition, just return a empty source.
                if (!partitionKeys.isEmpty() && 
getOrFetchPartitions().isEmpty()) {
                        return new CollectionInputFormat<>(new ArrayList<>(), 
null);
@@ -301,7 +324,7 @@ public class FileSystemTableSource extends 
InputFormatTableSource<RowData> imple
 
        @Override
        public String explainSource() {
-               return super.explainSource() +
+               return TableConnectorUtils.generateRuntimeName(getClass(), 
getTableSchema().getFieldNames()) +
                                (readPartitions == null ? "" : ", 
readPartitions=" + readPartitions) +
                                (selectFields == null ? "" : ", selectFields=" 
+ Arrays.toString(selectFields)) +
                                (limit == null ? "" : ", limit=" + limit) +

Reply via email to