This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c3daf84d551 [FLINK-36269][python] Remove usage about TableEnvironmentInternal#fromTableSource in python module (#25322) c3daf84d551 is described below commit c3daf84d551b3cf7f717221912f17dcb336ff176 Author: Xuyang <xyzhong...@163.com> AuthorDate: Thu Jan 2 21:00:30 2025 +0800 [FLINK-36269][python] Remove usage about TableEnvironmentInternal#fromTableSource in python module (#25322) --- .../docs/dev/python/table/table_environment.md | 11 ---- .../docs/dev/python/table/table_environment.md | 11 ---- flink-python/pyflink/table/table_environment.py | 6 +-- .../flink/table/runtime/arrow/ArrowUtils.java | 34 ++++++++++-- .../flink/table/runtime/arrow/ByteArrayUtils.java | 54 +++++++++++++++++++ .../runtime/arrow/sources/ArrowTableSource.java | 49 ++++++++++------- .../arrow/sources/ArrowTableSourceFactory.java | 62 ++++++++++++++++++++++ .../arrow/sources/ArrowTableSourceOptions.java | 34 ++++++++++++ .../org.apache.flink.table.factories.Factory | 1 + 9 files changed, 214 insertions(+), 48 deletions(-) diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md index b9347ccf0c1..b11cb632e9b 100644 --- a/docs/content.zh/docs/dev/python/table/table_environment.md +++ b/docs/content.zh/docs/dev/python/table/table_environment.md @@ -206,17 +206,6 @@ TableEnvironment API </tr> </thead> <tbody> - <tr> - <td> - <strong>from_table_source(table_source)</strong> - </td> - <td> - 通过 table source 创建一张表。 - </td> - <td class="text-center"> - {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_table_source" name="链接">}} - </td> - </tr> <tr> <td> <strong>scan(*table_path)</strong> diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md index 90bceefd336..5f9aaa7f26a 100644 --- a/docs/content/docs/dev/python/table/table_environment.md +++ b/docs/content/docs/dev/python/table/table_environment.md @@ -206,17 +206,6 @@ These APIs are used to create/remove Table API/SQL Tables and write queries: </tr> </thead> <tbody> - <tr> - <td> - <strong>from_table_source(table_source)</strong> - </td> - <td> - Creates a table from a table source. - </td> - <td class="text-center"> - {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_table_source" name="link">}} - </td> - </tr> <tr> <td> <strong>scan(*table_path)</strong> diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index b80a4233a46..8defd3a1f18 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1368,10 +1368,10 @@ class TableEnvironment(object): data_type = data_type.bridgedTo( load_java_class('org.apache.flink.table.data.RowData')) - j_arrow_table_source = \ - jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource( + j_arrow_table_source_descriptor = \ + jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSourceDesc( data_type, temp_file.name) - return Table(self._j_tenv.fromTableSource(j_arrow_table_source), self) + return Table(getattr(self._j_tenv, "from")(j_arrow_table_source_descriptor), self) finally: os.unlink(temp_file.name) diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java index a0461a917d5..818be062222 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java @@ -24,7 +24,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableImpl; import org.apache.flink.table.data.ArrayData; @@ -32,7 +34,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.columnar.vector.ColumnVector; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.operations.OutputConversionModifyOperation; -import org.apache.flink.table.runtime.arrow.sources.ArrowTableSource; +import org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceFactory; +import org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceOptions; import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector; import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector; import org.apache.flink.table.runtime.arrow.vectors.ArrowBinaryColumnVector; @@ -159,6 +162,8 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.types.DataType.getFieldNames; + /** Utilities for Arrow. */ @Internal public final class ArrowUtils { @@ -475,10 +480,31 @@ public final class ArrowUtils { } } - public static ArrowTableSource createArrowTableSource(DataType dataType, String fileName) - throws IOException { + public static TableDescriptor createArrowTableSourceDesc(DataType dataType, String fileName) { + List<String> fieldNames = getFieldNames(dataType); + List<DataType> fieldTypes = dataType.getChildren(); + org.apache.flink.table.api.Schema.Builder schemaBuilder = + org.apache.flink.table.api.Schema.newBuilder(); + for (int i = 0; i < fieldNames.size(); i++) { + schemaBuilder.column(fieldNames.get(i), fieldTypes.get(i)); + } + + try { + byte[][] data = readArrowBatches(fileName); + return TableDescriptor.forConnector(ArrowTableSourceFactory.IDENTIFIER) + .option( + ArrowTableSourceOptions.DATA, + ByteArrayUtils.twoDimByteArrayToString(data)) + .schema(schemaBuilder.build()) + .build(); + } catch (Throwable e) { + throw new TableException("Failed to read the arrow data from " + fileName, e); + } + } + + public static byte[][] readArrowBatches(String fileName) throws IOException { try (FileInputStream fis = new FileInputStream(fileName)) { - return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel())); + return readArrowBatches(fis.getChannel()); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ByteArrayUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ByteArrayUtils.java new file mode 100644 index 00000000000..9661b188518 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ByteArrayUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow; + +import org.apache.flink.annotation.Internal; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; + +/** A utility class for converting byte[][] to String and String to byte[][]. */ +@Internal +public class ByteArrayUtils { + + /** Convert byte[][] to String. */ + public static String twoDimByteArrayToString(byte[][] byteArray) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(byteArray); + oos.flush(); + byte[] serializedArray = bos.toByteArray(); + + return Base64.getEncoder().encodeToString(serializedArray); + } + + /** Convert String to byte[][]. */ + public static byte[][] stringToTwoDimByteArray(String str) + throws IOException, ClassNotFoundException { + byte[] bytes = Base64.getDecoder().decode(str); + + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bis); + return (byte[][]) ois.readObject(); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSource.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSource.java index 0f88072b9a0..c74a91b1702 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSource.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSource.java @@ -19,43 +19,54 @@ package org.apache.flink.table.runtime.arrow.sources; import org.apache.flink.annotation.Internal; -import org.apache.flink.legacy.table.sources.StreamTableSource; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.legacy.api.TableSchema; +import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.runtime.arrow.ByteArrayUtils; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.utils.DataTypeUtils; -/** A {@link StreamTableSource} for serialized arrow record batch data. */ +/** A {@link ScanTableSource} for serialized arrow record batch data. */ @Internal -public class ArrowTableSource implements StreamTableSource<RowData> { +public class ArrowTableSource implements ScanTableSource { - final DataType dataType; - final byte[][] arrowData; + private final DataType dataType; - public ArrowTableSource(DataType dataType, byte[][] arrowData) { + private final byte[][] arrowData; + + public ArrowTableSource(DataType dataType, String data) { + this.dataType = dataType; + try { + this.arrowData = ByteArrayUtils.stringToTwoDimByteArray(data); + } catch (Throwable e) { + throw new TableException( + "Failed to convert the data from String to byte[][].\nThe data is: " + data, e); + } + } + + private ArrowTableSource(DataType dataType, byte[][] arrowData) { this.dataType = dataType; this.arrowData = arrowData; } @Override - public boolean isBounded() { - return true; + public DynamicTableSource copy() { + return new ArrowTableSource(dataType, arrowData); } @Override - public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { - return execEnv.addSource(new ArrowSourceFunction(dataType, arrowData)); + public String asSummaryString() { + return "ArrowTableSource"; } @Override - public TableSchema getTableSchema() { - return TableSchema.fromResolvedSchema(DataTypeUtils.expandCompositeTypeToSchema(dataType)); + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); } @Override - public DataType getProducedDataType() { - return dataType; + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + return SourceFunctionProvider.of(new ArrowSourceFunction(dataType, arrowData), true); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceFactory.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceFactory.java new file mode 100644 index 00000000000..154f1e1ea27 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.sources; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.util.HashSet; +import java.util.Set; + +/** Factory for creating configured instances of {@link ArrowTableSource}.. */ +public class ArrowTableSourceFactory implements DynamicTableSourceFactory { + + public static final String IDENTIFIER = "python-arrow-source"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ArrowTableSourceOptions.DATA); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>(); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ReadableConfig tableOptions = helper.getOptions(); + + String data = tableOptions.get(ArrowTableSourceOptions.DATA); + DataType dataType = context.getPhysicalRowDataType(); + return new ArrowTableSource(dataType, data); + } +} diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceOptions.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceOptions.java new file mode 100644 index 00000000000..c57d26b8273 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/ArrowTableSourceOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.sources; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Table options for the {@link ArrowTableSource}. */ +public class ArrowTableSourceOptions { + + public static final ConfigOption<String> DATA = + ConfigOptions.key("data") + .stringType() + .noDefaultValue() + .withDescription( + "This is the data serialized by Arrow with a byte two-dimensional array. " + + "Note: The byte two-dimensional array is converted into a string using base64."); +} diff --git a/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a1a1ce73c4a..2101c564248 100644 --- a/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.table.utils.python.PythonDynamicTableFactory +org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceFactory