This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 52524b6 [HUDI-2165] Support Transformer for HoodieFlinkStreamer
(#3270)
52524b6 is described below
commit 52524b659d2cb64403e8ba87d2fefe6d536156e9
Author: vinoyang <[email protected]>
AuthorDate: Wed Jul 14 23:01:52 2021 +0800
[HUDI-2165] Support Transformer for HoodieFlinkStreamer (#3270)
* [HUDI-2165] Support Transformer for HoodieFlinkStreamer
---
.../hudi/sink/transform/ChainedTransformer.java | 51 ++++++
.../apache/hudi/sink/transform/Transformer.java | 35 ++++
.../apache/hudi/streamer/FlinkStreamerConfig.java | 6 +
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 18 ++-
.../java/org/apache/hudi/util/StreamerUtil.java | 19 +++
.../org/apache/hudi/sink/StreamWriteITCase.java | 178 ++++++++++++++-------
6 files changed, 245 insertions(+), 62 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
new file mode 100644
index 0000000..2fe2867
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.transform;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
+ */
+public class ChainedTransformer implements Transformer {
+
+ private List<Transformer> transformers;
+
+ public ChainedTransformer(List<Transformer> transformers) {
+ this.transformers = transformers;
+ }
+
+ public List<String> getTransformersNames() {
+ return transformers.stream().map(t ->
t.getClass().getName()).collect(Collectors.toList());
+ }
+
+ @Override
+ public DataStream<RowData> apply(DataStream<RowData> source) {
+ DataStream<RowData> dataStream = source;
+ for (Transformer t : transformers) {
+ dataStream = t.apply(dataStream);
+ }
+
+ return dataStream;
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
new file mode 100644
index 0000000..f40a838
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.transform;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Transform source stream to target stream before writing.
+ */
+public interface Transformer {
+
+ /**
+ * Transform source DataStream to target DataStream.
+ * @param source
+ */
+ DataStream<RowData> apply(DataStream<RowData> source);
+
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index d81fd3d..8229ffa 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -117,6 +117,12 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when
some records failed to be written.")
public Boolean commitOnErrors = false;
+ @Parameter(names = {"--transformer-class"},
+ description = "A subclass or a list of subclasses of
org.apache.hudi.sink.transform.Transformer"
+ + ". Allows transforming raw source DataStream to a target
DataStream (conforming to target schema) before "
+ + "writing. Default : Not set. Pass a comma-separated list of
subclass names to chain the transformations.")
+ public List<String> transformerClassNames = null;
+
/**
* Flink checkpoint interval.
*/
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index f8cf840..20cd833 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.streamer;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
@@ -30,6 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -44,6 +46,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -85,7 +88,7 @@ public class HoodieFlinkStreamer {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
- DataStream<HoodieRecord> hoodieDataStream = env.addSource(new
FlinkKafkaConsumer<>(
+ DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -95,8 +98,17 @@ public class HoodieFlinkStreamer {
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
- .uid("uid_kafka_source")
- .map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
+ .uid("uid_kafka_source");
+
+ if (cfg.transformerClassNames != null &&
!cfg.transformerClassNames.isEmpty()) {
+ Option<Transformer> transformer =
StreamerUtil.createTransformer(cfg.transformerClassNames);
+ if (transformer.isPresent()) {
+ dataStream = transformer.get().apply(dataStream);
+ }
+ }
+
+ DataStream<HoodieRecord> hoodieDataStream = dataStream.map(new
RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
+
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance()
.transform(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 9cbfe4f..9b3c7ac 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -37,6 +39,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -55,6 +59,8 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
@@ -305,4 +311,17 @@ public class StreamerUtil {
throw new HoodieException("Get instant time diff with interval [" +
oldInstantTime + ", " + newInstantTime + "] error", e);
}
}
+
+ public static Option<Transformer> createTransformer(List<String> classNames)
throws IOException {
+ try {
+ List<Transformer> transformers = new ArrayList<>();
+ for (String className :
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
+ transformers.add(ReflectionUtils.loadClass(className));
+ }
+ return transformers.isEmpty() ? Option.empty() : Option.of(new
ChainedTransformer(transformers));
+ } catch (Throwable e) {
+ throw new IOException("Could not load transformer class(es) " +
classNames, e);
+ }
+ }
+
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index be4c052..6d802fe 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -39,6 +39,8 @@ import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.sink.transform.Transformer;
+import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
@@ -66,13 +68,13 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -88,81 +90,67 @@ import java.util.concurrent.TimeUnit;
*/
public class StreamWriteITCase extends TestLogger {
- protected static final Logger LOG =
LoggerFactory.getLogger(StreamWriteITCase.class);
-
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
+ private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new
HashMap<>();
+ private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER
= new HashMap<>();
static {
EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1",
"id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2",
"id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3",
"id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4",
"id8,par4,id8,Han,56,8000,par4"));
+
+ EXPECTED_TRANSFORMER.put("par1",
Arrays.asList("id1,par1,id1,Danny,24,1000,par1",
"id2,par1,id2,Stephen,34,2000,par1"));
+ EXPECTED_TRANSFORMER.put("par2",
Arrays.asList("id3,par2,id3,Julian,54,3000,par2",
"id4,par2,id4,Fabian,32,4000,par2"));
+ EXPECTED_TRANSFORMER.put("par3",
Arrays.asList("id5,par3,id5,Sophia,19,5000,par3",
"id6,par3,id6,Emma,21,6000,par3"));
+ EXPECTED_TRANSFORMER.put("par4",
Arrays.asList("id7,par4,id7,Bob,45,7000,par4",
"id8,par4,id8,Han,57,8000,par4"));
+
+ EXPECTED_CHAINED_TRANSFORMER.put("par1",
Arrays.asList("id1,par1,id1,Danny,25,1000,par1",
"id2,par1,id2,Stephen,35,2000,par1"));
+ EXPECTED_CHAINED_TRANSFORMER.put("par2",
Arrays.asList("id3,par2,id3,Julian,55,3000,par2",
"id4,par2,id4,Fabian,33,4000,par2"));
+ EXPECTED_CHAINED_TRANSFORMER.put("par3",
Arrays.asList("id5,par3,id5,Sophia,20,5000,par3",
"id6,par3,id6,Emma,22,6000,par3"));
+ EXPECTED_CHAINED_TRANSFORMER.put("par4",
Arrays.asList("id7,par4,id7,Bob,46,7000,par4",
"id8,par4,id8,Han,58,8000,par4"));
}
@TempDir
File tempFile;
@Test
- public void testWriteToHoodie() throws Exception {
- Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
- StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
- execEnv.getConfig().disableObjectReuse();
- execEnv.setParallelism(4);
- // set up checkpoint interval
- execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
- execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
- // Read from file source
- RowType rowType =
- (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
- .getLogicalType();
- StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf);
-
- JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
- rowType,
- InternalTypeInfo.of(rowType),
- false,
- true,
- TimestampFormat.ISO_8601
- );
- String sourcePath = Objects.requireNonNull(Thread.currentThread()
- .getContextClassLoader().getResource("test_source.data")).toString();
+ public void testTransformerBeforeWriting() throws Exception {
+ Transformer transformer = (ds) -> ds.map((rowdata) -> {
+ if (rowdata instanceof GenericRowData) {
+ GenericRowData genericRD = (GenericRowData) rowdata;
+ //update age field to age + 1
+ genericRD.setField(2, genericRD.getInt(2) + 1);
+ return genericRD;
+ } else {
+ throw new RuntimeException("Unrecognized row type information: " +
rowdata.getClass().getSimpleName());
+ }
+ });
- DataStream<HoodieRecord> hoodieDataStream = execEnv
- // use continuous file source to trigger checkpoint
- .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), 2))
- .name("continuous_file_source")
- .setParallelism(1)
- .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
- .setParallelism(4)
- .map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
+ testWriteToHoodie(transformer, EXPECTED_TRANSFORMER);
+ }
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)));
- }
+ @Test
+ public void testChainedTransformersBeforeWriting() throws Exception {
+ Transformer t1 = (ds) -> ds.map((rowdata) -> {
+ if (rowdata instanceof GenericRowData) {
+ GenericRowData genericRD = (GenericRowData) rowdata;
+ //update age field to age + 1
+ genericRD.setField(2, genericRD.getInt(2) + 1);
+ return genericRD;
+ } else {
+ throw new RuntimeException("Unrecognized row type : " +
rowdata.getClass().getSimpleName());
+ }
+ });
- DataStream<Object> pipeline = hoodieDataStream
- // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner")
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
- .transform("hoodie_stream_write", TypeInformation.of(Object.class),
operatorFactory)
- .uid("uid_hoodie_stream_write");
- execEnv.addOperator(pipeline.getTransformation());
+ ChainedTransformer chainedTransformer = new
ChainedTransformer(Arrays.asList(t1, t1));
- JobClient client =
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
- // wait for the streaming job to finish
- client.getJobExecutionResult().get();
+ testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER);
+ }
- TestData.checkWrittenFullData(tempFile, EXPECTED);
+ @Test
+ public void testWriteToHoodieWithoutTransformer() throws Exception {
+ testWriteToHoodie(null, EXPECTED);
}
@Test
@@ -328,4 +316,76 @@ public class StreamWriteITCase extends TestLogger {
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
+
+ private void testWriteToHoodie(
+ Transformer transformer,
+ Map<String, List<String>> expected) throws Exception {
+
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(4);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ // Read from file source
+ RowType rowType =
+ (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+ StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+ new StreamWriteOperatorFactory<>(conf);
+
+ JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601
+ );
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+
+ DataStream<RowData> dataStream = execEnv
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), 2))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+ .setParallelism(4);
+
+ if (transformer != null) {
+ dataStream = transformer.apply(dataStream);
+ }
+
+ DataStream<HoodieRecord> hoodieDataStream = dataStream
+ .map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ }
+
+ DataStream<Object> pipeline = hoodieDataStream
+ // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
+ .keyBy(HoodieRecord::getRecordKey)
+ .transform(
+ "bucket_assigner",
+ TypeInformation.of(HoodieRecord.class),
+ new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+ .uid("uid_bucket_assigner")
+ // shuffle by fileId(bucket id)
+ .keyBy(record -> record.getCurrentLocation().getFileId())
+ .transform("hoodie_stream_write", TypeInformation.of(Object.class),
operatorFactory)
+ .uid("uid_hoodie_stream_write");
+ execEnv.addOperator(pipeline.getTransformation());
+
+ JobClient client =
execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+ // wait for the streaming job to finish
+ client.getJobExecutionResult().get();
+
+ TestData.checkWrittenFullData(tempFile, expected);
+
+ }
}