This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 52524b6  [HUDI-2165] Support Transformer for HoodieFlinkStreamer 
(#3270)
52524b6 is described below

commit 52524b659d2cb64403e8ba87d2fefe6d536156e9
Author: vinoyang <[email protected]>
AuthorDate: Wed Jul 14 23:01:52 2021 +0800

    [HUDI-2165] Support Transformer for HoodieFlinkStreamer (#3270)
    
    * [HUDI-2165] Support Transformer for HoodieFlinkStreamer
---
 .../hudi/sink/transform/ChainedTransformer.java    |  51 ++++++
 .../apache/hudi/sink/transform/Transformer.java    |  35 ++++
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   6 +
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  18 ++-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  19 +++
 .../org/apache/hudi/sink/StreamWriteITCase.java    | 178 ++++++++++++++-------
 6 files changed, 245 insertions(+), 62 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
new file mode 100644
index 0000000..2fe2867
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.transform;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
+ */
+public class ChainedTransformer implements Transformer {
+
+  private List<Transformer> transformers;
+
+  public ChainedTransformer(List<Transformer> transformers) {
+    this.transformers = transformers;
+  }
+
+  public List<String> getTransformersNames() {
+    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+  }
+
+  @Override
+  public DataStream<RowData> apply(DataStream<RowData> source) {
+    DataStream<RowData> dataStream = source;
+    for (Transformer t : transformers) {
+      dataStream = t.apply(dataStream);
+    }
+
+    return dataStream;
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
new file mode 100644
index 0000000..f40a838
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.transform;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Transform source stream to target stream before writing.
+ */
+public interface Transformer {
+
+  /**
+   * Transform source DataStream to target DataStream.
+   * @param source
+   */
+  DataStream<RowData> apply(DataStream<RowData> source);
+
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index d81fd3d..8229ffa 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -117,6 +117,12 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--commit-on-errors"}, description = "Commit even when 
some records failed to be written.")
   public Boolean commitOnErrors = false;
 
+  @Parameter(names = {"--transformer-class"},
+      description = "A subclass or a list of subclasses of 
org.apache.hudi.sink.transform.Transformer"
+          + ". Allows transforming raw source DataStream to a target 
DataStream (conforming to target schema) before "
+          + "writing. Default : Not set. Pass a comma-separated list of 
subclass names to chain the transformations.")
+  public List<String> transformerClassNames = null;
+
   /**
    * Flink checkpoint interval.
    */
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index f8cf840..20cd833 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.streamer;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
@@ -30,6 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -44,6 +46,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -85,7 +88,7 @@ public class HoodieFlinkStreamer {
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
         new StreamWriteOperatorFactory<>(conf);
 
-    DataStream<HoodieRecord> hoodieDataStream = env.addSource(new 
FlinkKafkaConsumer<>(
+    DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
         new JsonRowDataDeserializationSchema(
             rowType,
@@ -95,8 +98,17 @@ public class HoodieFlinkStreamer {
             TimestampFormat.ISO_8601
         ), kafkaProps))
         .name("kafka_source")
-        .uid("uid_kafka_source")
-        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
+        .uid("uid_kafka_source");
+
+    if (cfg.transformerClassNames != null && 
!cfg.transformerClassNames.isEmpty()) {
+      Option<Transformer> transformer = 
StreamerUtil.createTransformer(cfg.transformerClassNames);
+      if (transformer.isPresent()) {
+        dataStream = transformer.get().apply(dataStream);
+      }
+    }
+
+    DataStream<HoodieRecord> hoodieDataStream = dataStream.map(new 
RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
+
     if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       hoodieDataStream = hoodieDataStream.rebalance()
           .transform(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 9cbfe4f..9b3c7ac 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
@@ -37,6 +39,8 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
@@ -55,6 +59,8 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.StringReader;
 import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
@@ -305,4 +311,17 @@ public class StreamerUtil {
       throw new HoodieException("Get instant time diff with interval [" + 
oldInstantTime + ", " + newInstantTime + "] error", e);
     }
   }
+
+  public static Option<Transformer> createTransformer(List<String> classNames) 
throws IOException {
+    try {
+      List<Transformer> transformers = new ArrayList<>();
+      for (String className : 
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
+        transformers.add(ReflectionUtils.loadClass(className));
+      }
+      return transformers.isEmpty() ? Option.empty() : Option.of(new 
ChainedTransformer(transformers));
+    } catch (Throwable e) {
+      throw new IOException("Could not load transformer class(es) " + 
classNames, e);
+    }
+  }
+
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index be4c052..6d802fe 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -39,6 +39,8 @@ import org.apache.hudi.sink.compact.FlinkCompactionConfig;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.CompactionUtil;
@@ -66,13 +68,13 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
@@ -88,81 +90,67 @@ import java.util.concurrent.TimeUnit;
  */
 public class StreamWriteITCase extends TestLogger {
 
-  protected static final Logger LOG = 
LoggerFactory.getLogger(StreamWriteITCase.class);
-
   private static final Map<String, List<String>> EXPECTED = new HashMap<>();
+  private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new 
HashMap<>();
+  private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER 
= new HashMap<>();
 
   static {
     EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", 
"id2,par1,id2,Stephen,33,2000,par1"));
     EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", 
"id4,par2,id4,Fabian,31,4000,par2"));
     EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", 
"id6,par3,id6,Emma,20,6000,par3"));
     EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", 
"id8,par4,id8,Han,56,8000,par4"));
+
+    EXPECTED_TRANSFORMER.put("par1", 
Arrays.asList("id1,par1,id1,Danny,24,1000,par1", 
"id2,par1,id2,Stephen,34,2000,par1"));
+    EXPECTED_TRANSFORMER.put("par2", 
Arrays.asList("id3,par2,id3,Julian,54,3000,par2", 
"id4,par2,id4,Fabian,32,4000,par2"));
+    EXPECTED_TRANSFORMER.put("par3", 
Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", 
"id6,par3,id6,Emma,21,6000,par3"));
+    EXPECTED_TRANSFORMER.put("par4", 
Arrays.asList("id7,par4,id7,Bob,45,7000,par4", 
"id8,par4,id8,Han,57,8000,par4"));
+
+    EXPECTED_CHAINED_TRANSFORMER.put("par1", 
Arrays.asList("id1,par1,id1,Danny,25,1000,par1", 
"id2,par1,id2,Stephen,35,2000,par1"));
+    EXPECTED_CHAINED_TRANSFORMER.put("par2", 
Arrays.asList("id3,par2,id3,Julian,55,3000,par2", 
"id4,par2,id4,Fabian,33,4000,par2"));
+    EXPECTED_CHAINED_TRANSFORMER.put("par3", 
Arrays.asList("id5,par3,id5,Sophia,20,5000,par3", 
"id6,par3,id6,Emma,22,6000,par3"));
+    EXPECTED_CHAINED_TRANSFORMER.put("par4", 
Arrays.asList("id7,par4,id7,Bob,46,7000,par4", 
"id8,par4,id8,Han,58,8000,par4"));
   }
 
   @TempDir
   File tempFile;
 
   @Test
-  public void testWriteToHoodie() throws Exception {
-    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-    execEnv.getConfig().disableObjectReuse();
-    execEnv.setParallelism(4);
-    // set up checkpoint interval
-    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
-    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
-    // Read from file source
-    RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
-            .getLogicalType();
-    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf);
-
-    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
-        rowType,
-        InternalTypeInfo.of(rowType),
-        false,
-        true,
-        TimestampFormat.ISO_8601
-    );
-    String sourcePath = Objects.requireNonNull(Thread.currentThread()
-        .getContextClassLoader().getResource("test_source.data")).toString();
+  public void testTransformerBeforeWriting() throws Exception {
+    Transformer transformer = (ds) -> ds.map((rowdata) -> {
+      if (rowdata instanceof GenericRowData) {
+        GenericRowData genericRD = (GenericRowData) rowdata;
+        //update age field to age + 1
+        genericRD.setField(2, genericRD.getInt(2) + 1);
+        return genericRD;
+      } else {
+        throw new RuntimeException("Unrecognized row type information: " + 
rowdata.getClass().getSimpleName());
+      }
+    });
 
-    DataStream<HoodieRecord> hoodieDataStream = execEnv
-        // use continuous file source to trigger checkpoint
-        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), 2))
-        .name("continuous_file_source")
-        .setParallelism(1)
-        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
-        .setParallelism(4)
-        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
+    testWriteToHoodie(transformer, EXPECTED_TRANSFORMER);
+  }
 
-    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
-          TypeInformation.of(HoodieRecord.class),
-          new ProcessOperator<>(new BootstrapFunction<>(conf)));
-    }
+  @Test
+  public void testChainedTransformersBeforeWriting() throws Exception {
+    Transformer t1 = (ds) -> ds.map((rowdata) -> {
+      if (rowdata instanceof GenericRowData) {
+        GenericRowData genericRD = (GenericRowData) rowdata;
+        //update age field to age + 1
+        genericRD.setField(2, genericRD.getInt(2) + 1);
+        return genericRD;
+      } else {
+        throw new RuntimeException("Unrecognized row type : " + 
rowdata.getClass().getSimpleName());
+      }
+    });
 
-    DataStream<Object> pipeline = hoodieDataStream
-        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
-        .keyBy(HoodieRecord::getRecordKey)
-        .transform(
-            "bucket_assigner",
-            TypeInformation.of(HoodieRecord.class),
-            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-        .uid("uid_bucket_assigner")
-        // shuffle by fileId(bucket id)
-        .keyBy(record -> record.getCurrentLocation().getFileId())
-        .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
-        .uid("uid_hoodie_stream_write");
-    execEnv.addOperator(pipeline.getTransformation());
+    ChainedTransformer chainedTransformer = new 
ChainedTransformer(Arrays.asList(t1, t1));
 
-    JobClient client = 
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
-    // wait for the streaming job to finish
-    client.getJobExecutionResult().get();
+    testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER);
+  }
 
-    TestData.checkWrittenFullData(tempFile, EXPECTED);
+  @Test
+  public void testWriteToHoodieWithoutTransformer() throws Exception {
+    testWriteToHoodie(null, EXPECTED);
   }
 
   @Test
@@ -328,4 +316,76 @@ public class StreamWriteITCase extends TestLogger {
 
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
+
+  private void testWriteToHoodie(
+      Transformer transformer,
+      Map<String, List<String>> expected) throws Exception {
+
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Read from file source
+    RowType rowType =
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf);
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+        rowType,
+        InternalTypeInfo.of(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    DataStream<RowData> dataStream = execEnv
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), 2))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4);
+
+    if (transformer != null) {
+      dataStream = transformer.apply(dataStream);
+    }
+
+    DataStream<HoodieRecord> hoodieDataStream = dataStream
+        .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+          TypeInformation.of(HoodieRecord.class),
+          new ProcessOperator<>(new BootstrapFunction<>(conf)));
+    }
+
+    DataStream<Object> pipeline = hoodieDataStream
+        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
+        .keyBy(HoodieRecord::getRecordKey)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
+        .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
+        .uid("uid_hoodie_stream_write");
+    execEnv.addOperator(pipeline.getTransformation());
+
+    JobClient client = 
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+    // wait for the streaming job to finish
+    client.getJobExecutionResult().get();
+
+    TestData.checkWrittenFullData(tempFile, expected);
+
+  }
 }

Reply via email to