This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f8d94b73b [HUDI-5331] Add schema settings with stream api (#7384)
74f8d94b73b is described below

commit 74f8d94b73bebe7ed67b2febf4f4a587eefee12a
Author: superche <[email protected]>
AuthorDate: Tue Dec 6 18:20:47 2022 +0800

    [HUDI-5331] Add schema settings with stream api (#7384)
    
    * Add schema set with stream api.
    
    Co-authored-by: superche <[email protected]>
---
 .../java/org/apache/hudi/util/HoodiePipeline.java  |  15 +++
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 127 +++++++++++++++++++++
 2 files changed, 142 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index f95367c8361..61ec2fccaa0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.hudi.adapter.Utils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieTableFactory;
@@ -125,6 +127,19 @@ public class HoodiePipeline {
       return this;
     }
 
+    /**
+     * Add table schema.
+     */
+    public Builder schema(Schema schema) {
+      for (Schema.UnresolvedColumn column : schema.getColumns()) {
+        column(column.toString());
+      }
+      if (schema.getPrimaryKey().isPresent()) {
+        
pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(",
 ")));
+      }
+      return this;
+    }
+
     /**
      * Add a config option.
      */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 6ab4b1b6e0d..ad5642924da 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -22,10 +22,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.table.catalog.HoodieCatalog;
+import org.apache.hudi.table.catalog.TableOptionProperties;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.HoodiePipeline;
 import org.apache.hudi.util.StreamerUtil;
@@ -49,6 +52,10 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -70,6 +77,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+
 /**
  * Integration test for Flink Hoodie stream sink.
  */
@@ -427,4 +437,121 @@ public class ITTestDataStreamWrite extends TestLogger {
     execute(execEnv, false, "Api_Sink_Test");
     TestData.checkWrittenDataCOW(tempFile, EXPECTED);
   }
+
+  @Test
+  public void testHoodiePipelineBuilderSourceWithSchemaSet() throws Exception {
+    //create a StreamExecutionEnvironment instance.
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(1);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // create table dir
+    final String dbName = DEFAULT_DATABASE.defaultValue();
+    final String tableName = "t1";
+    File testTable = new File(tempFile, dbName + Path.SEPARATOR + tableName);
+    testTable.mkdir();
+
+    Configuration conf = 
TestConfigurations.getDefaultConf(testTable.toURI().toString());
+    conf.setString(FlinkOptions.TABLE_NAME, tableName);
+    conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+
+    // write 3 batches of data set
+    TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+    TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+    String latestCommit = 
TestUtils.getLastCompleteInstant(testTable.toURI().toString());
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), testTable.toURI().toString());
+    options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
+
+    // create hoodie catalog, in order to get the table schema
+    Configuration catalogConf = new Configuration();
+    catalogConf.setString(CATALOG_PATH.key(), tempFile.toURI().toString());
+    catalogConf.setString(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
+    HoodieCatalog catalog = new HoodieCatalog("hudi", catalogConf);
+    catalog.open();
+    // get hoodieTable
+    ObjectPath tablePath = new ObjectPath(dbName, tableName);
+    TableOptionProperties.createProperties(testTable.toURI().toString(), 
HadoopConfigurations.getHadoopConf(catalogConf), options);
+    CatalogBaseTable hoodieTable = catalog.getTable(tablePath);
+
+    //read a hoodie table use low-level source api.
+    HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
+        .schema(hoodieTable.getUnresolvedSchema())
+        .pk("uuid")
+        .partition("partition")
+        .options(options);
+    DataStream<RowData> rowDataDataStream = builder.source(execEnv);
+    List<RowData> result = new ArrayList<>();
+    rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
+    TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
+    TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
+  }
+
+  @Test
+  public void testHoodiePipelineBuilderSinkWithSchemaSet() throws Exception {
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    Map<String, String> options = new HashMap<>();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
+    options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), 
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+    Configuration conf = Configuration.fromMap(options);
+    // Read from file source
+    RowType rowType =
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+        rowType,
+        InternalTypeInfo.of(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    format.setCharsetName("UTF-8");
+
+    DataStream dataStream = execEnv
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), 2))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4);
+
+    Schema schema =
+        Schema.newBuilder()
+            .column("uuid", DataTypes.STRING().notNull())
+            .column("name", DataTypes.STRING())
+            .column("age", DataTypes.INT())
+            .column("ts", DataTypes.TIMESTAMP(3))
+            .column("partition", DataTypes.STRING())
+            .primaryKey("uuid")
+            .build();
+
+    //sink to hoodie table use low-level sink api.
+    HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
+        .schema(schema)
+        .partition("partition")
+        .options(options);
+
+    builder.sink(dataStream, false);
+
+    execute(execEnv, false, "Api_Sink_Test");
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+  }
 }

Reply via email to