shuiqiangchen commented on code in PR #19140:
URL: https://github.com/apache/flink/pull/19140#discussion_r912900535
##########
flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java:
##########
@@ -18,51 +18,56 @@
package org.apache.flink.table.utils.python;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.types.Row;
+import org.apache.flink.api.common.python.PythonBridgeUtils;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
-import java.util.List;
+import java.io.IOException;
-/** An {@link InputFormatTableSource} created by python 'from_element' method.
*/
-@Internal
-public class PythonInputFormatTableSource extends InputFormatTableSource<Row> {
-
- /**
- * The input format which contains the python data collection, usually
created by {@link
- * PythonTableUtils#getInputFormat(List, TypeInformation,
ExecutionConfig)} method.
- */
- private final InputFormat<Row, ? extends InputSplit> inputFormat;
-
- /**
- * The row type info of the python data. It is generated by the python
'from_element' method.
- */
- private final RowTypeInfo rowTypeInfo;
+/** Implementation of {@link ScanTableSource} for python elements table. */
+public class PythonInputFormatTableSource implements ScanTableSource {
+ private final String filePath;
+ private final boolean batched;
+ private final DataType producedDataType;
public PythonInputFormatTableSource(
- InputFormat<Row, ? extends InputSplit> inputFormat, RowTypeInfo
rowTypeInfo) {
- this.inputFormat = inputFormat;
- this.rowTypeInfo = rowTypeInfo;
+ String filePath, boolean batched, DataType producedDataType) {
+ this.filePath = filePath;
+ this.batched = batched;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new PythonInputFormatTableSource(filePath, batched,
producedDataType);
}
@Override
- public InputFormat<Row, ?> getInputFormat() {
- return inputFormat;
+ public String asSummaryString() {
+ return "PythonInputFormatTableSource";
}
@Override
- public TableSchema getTableSchema() {
- return TableSchema.fromTypeInfo(rowTypeInfo);
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
}
@Override
- public TypeInformation<Row> getReturnType() {
- return rowTypeInfo;
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ try {
+ InputFormat<RowData, ?> inputFormat =
+ PythonTableUtils.getInputFormat(
+ PythonBridgeUtils.readPythonObjects(filePath,
batched),
Review Comment:
Yes, here we are always in batched more. It is an option that make the SerDe
at python side more extensible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]