This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2489409a2337ca2909075330953e61e591473054 Author: superche <[email protected]> AuthorDate: Tue Dec 6 18:20:47 2022 +0800 [HUDI-5331] Add schema settings with stream api (#7384) * Add schema set with stream api. Co-authored-by: superche <[email protected]> --- .../java/org/apache/hudi/util/HoodiePipeline.java | 15 +++ .../apache/hudi/sink/ITTestDataStreamWrite.java | 127 +++++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java index f95367c8361..61ec2fccaa0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.utils.EncodingUtils; import org.apache.hudi.adapter.Utils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTableFactory; @@ -125,6 +127,19 @@ public class HoodiePipeline { return this; } + /** + * Add table schema. + */ + public Builder schema(Schema schema) { + for (Schema.UnresolvedColumn column : schema.getColumns()) { + column(column.toString()); + } + if (schema.getPrimaryKey().isPresent()) { + pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(", "))); + } + return this; + } + /** * Add a config option. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 6ab4b1b6e0d..ad5642924da 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -22,10 +22,13 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; +import org.apache.hudi.table.catalog.HoodieCatalog; +import org.apache.hudi.table.catalog.TableOptionProperties; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.HoodiePipeline; import org.apache.hudi.util.StreamerUtil; @@ -49,6 +52,10 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -70,6 +77,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH; +import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE; + /** * Integration test for Flink Hoodie stream sink. */ @@ -427,4 +437,121 @@ public class ITTestDataStreamWrite extends TestLogger { execute(execEnv, false, "Api_Sink_Test"); TestData.checkWrittenDataCOW(tempFile, EXPECTED); } + + @Test + public void testHoodiePipelineBuilderSourceWithSchemaSet() throws Exception { + //create a StreamExecutionEnvironment instance. + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(1); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // create table dir + final String dbName = DEFAULT_DATABASE.defaultValue(); + final String tableName = "t1"; + File testTable = new File(tempFile, dbName + Path.SEPARATOR + tableName); + testTable.mkdir(); + + Configuration conf = TestConfigurations.getDefaultConf(testTable.toURI().toString()); + conf.setString(FlinkOptions.TABLE_NAME, tableName); + conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); + + // write 3 batches of data set + TestData.writeData(TestData.dataSetInsert(1, 2), conf); + TestData.writeData(TestData.dataSetInsert(3, 4), conf); + TestData.writeData(TestData.dataSetInsert(5, 6), conf); + + String latestCommit = TestUtils.getLastCompleteInstant(testTable.toURI().toString()); + + Map<String, String> options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), testTable.toURI().toString()); + options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit); + + // create hoodie catalog, in order to get the table schema + Configuration catalogConf = new Configuration(); + catalogConf.setString(CATALOG_PATH.key(), tempFile.toURI().toString()); + catalogConf.setString(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); + HoodieCatalog catalog = new HoodieCatalog("hudi", catalogConf); + catalog.open(); + // get hoodieTable + ObjectPath tablePath = new ObjectPath(dbName, tableName); + TableOptionProperties.createProperties(testTable.toURI().toString(), HadoopConfigurations.getHadoopConf(catalogConf), options); + CatalogBaseTable hoodieTable = catalog.getTable(tablePath); + + //read a hoodie table use low-level source api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source") + .schema(hoodieTable.getUnresolvedSchema()) + .pk("uuid") + .partition("partition") + .options(options); + DataStream<RowData> rowDataDataStream = builder.source(execEnv); + List<RowData> result = new ArrayList<>(); + rowDataDataStream.executeAndCollect().forEachRemaining(result::add); + TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data + TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6)); + } + + @Test + public void testHoodiePipelineBuilderSinkWithSchemaSet() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + Map<String, String> options = new HashMap<>(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); + options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + Configuration conf = Configuration.fromMap(options); + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + format.setCharsetName("UTF-8"); + + DataStream dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + + Schema schema = + Schema.newBuilder() + .column("uuid", DataTypes.STRING().notNull()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .column("partition", DataTypes.STRING()) + .primaryKey("uuid") + .build(); + + //sink to hoodie table use low-level sink api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") + .schema(schema) + .partition("partition") + .options(options); + + builder.sink(dataStream, false); + + execute(execEnv, false, "Api_Sink_Test"); + TestData.checkWrittenDataCOW(tempFile, EXPECTED); + } }
