This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 74f8d94b73b [HUDI-5331] Add schema settings with stream api (#7384)
74f8d94b73b is described below
commit 74f8d94b73bebe7ed67b2febf4f4a587eefee12a
Author: superche <[email protected]>
AuthorDate: Tue Dec 6 18:20:47 2022 +0800
[HUDI-5331] Add schema settings with stream api (#7384)
* Add schema set with stream api.
Co-authored-by: superche <[email protected]>
---
.../java/org/apache/hudi/util/HoodiePipeline.java | 15 +++
.../apache/hudi/sink/ITTestDataStreamWrite.java | 127 +++++++++++++++++++++
2 files changed, 142 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index f95367c8361..61ec2fccaa0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -18,6 +18,8 @@
package org.apache.hudi.util;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.utils.EncodingUtils;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTableFactory;
@@ -125,6 +127,19 @@ public class HoodiePipeline {
return this;
}
+ /**
+ * Add table schema.
+ */
+ public Builder schema(Schema schema) {
+ for (Schema.UnresolvedColumn column : schema.getColumns()) {
+ column(column.toString());
+ }
+ if (schema.getPrimaryKey().isPresent()) {
+
pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(",
")));
+ }
+ return this;
+ }
+
/**
* Add a config option.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 6ab4b1b6e0d..ad5642924da 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -22,10 +22,13 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.table.catalog.HoodieCatalog;
+import org.apache.hudi.table.catalog.TableOptionProperties;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
@@ -49,6 +52,10 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -70,6 +77,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+
/**
* Integration test for Flink Hoodie stream sink.
*/
@@ -427,4 +437,121 @@ public class ITTestDataStreamWrite extends TestLogger {
execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
+
+ @Test
+ public void testHoodiePipelineBuilderSourceWithSchemaSet() throws Exception {
+ //create a StreamExecutionEnvironment instance.
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(1);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ // create table dir
+ final String dbName = DEFAULT_DATABASE.defaultValue();
+ final String tableName = "t1";
+ File testTable = new File(tempFile, dbName + Path.SEPARATOR + tableName);
+ testTable.mkdir();
+
+ Configuration conf =
TestConfigurations.getDefaultConf(testTable.toURI().toString());
+ conf.setString(FlinkOptions.TABLE_NAME, tableName);
+ conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+
+ // write 3 batches of data set
+ TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+ TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+ String latestCommit =
TestUtils.getLastCompleteInstant(testTable.toURI().toString());
+
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), testTable.toURI().toString());
+ options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
+
+ // create hoodie catalog, in order to get the table schema
+ Configuration catalogConf = new Configuration();
+ catalogConf.setString(CATALOG_PATH.key(), tempFile.toURI().toString());
+ catalogConf.setString(DEFAULT_DATABASE.key(),
DEFAULT_DATABASE.defaultValue());
+ HoodieCatalog catalog = new HoodieCatalog("hudi", catalogConf);
+ catalog.open();
+ // get hoodieTable
+ ObjectPath tablePath = new ObjectPath(dbName, tableName);
+ TableOptionProperties.createProperties(testTable.toURI().toString(),
HadoopConfigurations.getHadoopConf(catalogConf), options);
+ CatalogBaseTable hoodieTable = catalog.getTable(tablePath);
+
+ //read a hoodie table use low-level source api.
+ HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
+ .schema(hoodieTable.getUnresolvedSchema())
+ .pk("uuid")
+ .partition("partition")
+ .options(options);
+ DataStream<RowData> rowDataDataStream = builder.source(execEnv);
+ List<RowData> result = new ArrayList<>();
+ rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
+ TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
+ TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
+ }
+
+ @Test
+ public void testHoodiePipelineBuilderSinkWithSchemaSet() throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Map<String, String> options = new HashMap<>();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(4);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
+ options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(),
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ Configuration conf = Configuration.fromMap(options);
+ // Read from file source
+ RowType rowType =
+ (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+
+ JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601
+ );
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+
+ TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ format.setCharsetName("UTF-8");
+
+ DataStream dataStream = execEnv
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+ .setParallelism(4);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column("uuid", DataTypes.STRING().notNull())
+ .column("name", DataTypes.STRING())
+ .column("age", DataTypes.INT())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .column("partition", DataTypes.STRING())
+ .primaryKey("uuid")
+ .build();
+
+ //sink to hoodie table use low-level sink api.
+ HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
+ .schema(schema)
+ .partition("partition")
+ .options(options);
+
+ builder.sink(dataStream, false);
+
+ execute(execEnv, false, "Api_Sink_Test");
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+ }
}