This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 36a9113aca2 [HUDI-7929] Create k8s example for flink hudi integration
(#11570)
36a9113aca2 is described below
commit 36a9113aca280aed206dcfbdd317ccfb6b6a6382
Author: Peter Huang <[email protected]>
AuthorDate: Sun Jul 7 21:16:34 2024 -0700
[HUDI-7929] Create k8s example for flink hudi integration (#11570)
---
hudi-examples/hudi-examples-k8s/Dockerfile | 31 ++++
hudi-examples/hudi-examples-k8s/README.md | 113 ++++++++++++
.../hudi-examples-k8s/config/hadoop/core-site.xml | 62 +++++++
.../config/k8s/flink-deployment.yml | 59 ++++++
.../config/k8s/minio-standalone.yaml | 83 +++++++++
hudi-examples/hudi-examples-k8s/pom.xml | 200 +++++++++++++++++++++
.../k8s/quickstart/HudiDataStreamWriter.java | 170 ++++++++++++++++++
.../k8s/quickstart/utils/DataGenerator.java | 79 ++++++++
hudi-examples/pom.xml | 1 +
9 files changed, 798 insertions(+)
diff --git a/hudi-examples/hudi-examples-k8s/Dockerfile
b/hudi-examples/hudi-examples-k8s/Dockerfile
new file mode 100644
index 00000000000..a3a7e7889f8
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/Dockerfile
@@ -0,0 +1,31 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+FROM flink:1.18
+
+RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
+RUN mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar
/opt/flink/plugins/flink-s3-fs-hadoop
+RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop
+
+ENV FLINK_HADOOP_CONF /etc/hadoop/conf
+RUN mkdir -p $FLINK_HADOOP_CONF
+COPY config/hadoop/core-site.xml $FLINK_HADOOP_CONF
+ENV HADOOP_CLASSPATH=$FLINK_HADOOP_CONF
+WORKDIR /opt/hudi/examples
+COPY target/hudi-examples-k8s-*-SNAPSHOT.jar streaming/hudi-examples-k8s.jar
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/README.md
b/hudi-examples/hudi-examples-k8s/README.md
new file mode 100644
index 00000000000..96371b6d3eb
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/README.md
@@ -0,0 +1,113 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Quick Start
+
+## Prerequisites
+
+Install the helm chart for Flink on Kubernetes. See [Flink on
Kubernetes](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/)
for more details.
+
+Make sure local Docker has been assigned at least 8 cores and 8G memory.
+
+## Setting up a docker registry
+
+- Point docker-cli to Docker running in minikube
+ ```sh
+ eval $(minikube -p minikube docker-env)
+ ```
+
+- Start local Docker registry in minikube Docker (the `docker` command is
talking to minikube Docker)
+ ```sh
+ docker run -d -p 5001:5000 --restart=always --name registry registry:2
+ ```
+ We tunnel port `5001` to `5000`, you can choose any available port.
+
+## Building the Hudi Flink Docker Image
+
+We need to mount a local path on the host for all containers in minikube to
work with.
+
+```shell
+mkdir /tmp/minikubedata
+minikube mount /tmp/minikubedata:/data &
+```
+
+We need a custom docker image that extends the flink base image and adds the
hudi flink example jar to it.
+You can build this docker image by running the following command:
+```shell
+docker build -t localhost:5001/hudi/hudi-flink .
+```
+
+This should have pushed the docker image to the docker registry.
+You can verify this by running
+```shell
+docker images
+```
+
+This should show you the docker image `hudi-flink:latest` in the list of
images.
+```angular2html
+REPOSITORY TAG
IMAGE ID CREATED SIZE
+localhost:5001/hudi/hudi-flink latest
87b936181d74 32 minutes ago 1.08GB
+```
+
+## Start Minio server
+
+Create Minio server in Kubernetes.
+
+```shell
+kubectl apply -f config/k8s/minio-standalone.yaml
+```
+
+This will create a pod in Kubernetes that runs the Minio server.
+You can verify this by running
+```shell
+kubectl port-forward svc/minio-svc 9090
+```
+and then opening the Minio UI in your browser by hitting on
`http://localhost:9090`.
+
+### Prepare the testing bucket
+
+Use the default credentials: `minioadmin` and `minioadmin` to log into the
console.
+
+From the UI, manually create a bucket named `test`.
+
+### Destroy or replace Minio server
+
+```shell
+kubectl delete deployment minio-deploy
+kubectl replace -f config/k8s/minio-standalone.yaml
+```
+
+## Run the Flink Job
+
+We can now submit the Flink job to the Flink cluster running in Kubernetes.
+```shell
+kubectl apply -f config/k8s/flink-deployment.yaml
+```
+
+This will create a pod in Kubernetes that runs the Flink job.
+You can verify this by running
+```shell
+kubectl port-forward svc/basic-example-rest 8081
+```
+and then opening the Flink UI in your browser by hitting on
`http://localhost:8081`.
+
+### Destroy or replace Flink app
+
+```shell
+kubectl delete deployment basic-example
+kubectl replace deployment basic-example
+```
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml
b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml
new file mode 100644
index 00000000000..90e3ef5bb18
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>fs.s3a.impl</name>
+ <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+ </property>
+
+ <!-- Comma separated list of local directories used to buffer
+ large results prior to transmitting them to S3. -->
+ <property>
+ <name>fs.s3a.buffer.dir</name>
+ <value>/tmp</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://minio-svc:9000</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.access.key</name>
+ <description>AWS access key ID.
+ Omit for IAM role-based or provider-based
authentication.</description>
+ <value>minioadmin</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <description>AWS secret key.
+ Omit for IAM role-based or provider-based
authentication.</description>
+ <value>minioadmin</value>
+ </property>
+
+ <property>
+ <name>fs.s3a.aws.credentials.provider</name>
+ <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
+ </property>
+
+</configuration>
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml
b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml
new file mode 100644
index 00000000000..733d2d04d7c
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml
@@ -0,0 +1,59 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: basic-example
+spec:
+ image: localhost:5001/hudi/hudi-flink:latest
+ flinkVersion: v1_18
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "1"
+ execution.checkpointing.checkpoints-after-tasks-finish.enabled: "true"
+ state.backend.type: filesystem
+ state.savepoints.dir: s3a://test/savepoints/
+ state.checkpoints.dir: s3a://test/checkpoints/
+ s3.access-key: minioadmin
+ s3.secret-key: minioadmin
+ s3.endpoint: http://minio-svc:9000
+ s3.path.style.access: "true"
+ s3a.access-key: minioadmin
+ s3a.secret-key: minioadmin
+ s3a.endpoint: http://minio-svc:9000
+ s3a.path.style.access: "true"
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "1g"
+ cpu: 1
+ podTemplate:
+ spec:
+ containers:
+ - name: flink-main-container
+ env:
+ - name: TARGET_S3_PATH
+ value: 's3a://test/hudi'
+ taskManager:
+ resource:
+ memory: "1g"
+ cpu: 1
+ job:
+ jarURI: local:///opt/hudi/examples/streaming/hudi-examples-k8s.jar
+ parallelism: 2
+ upgradeMode: stateless
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml
b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml
new file mode 100644
index 00000000000..d04ec46f307
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml
@@ -0,0 +1,83 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ creationTimestamp: null
+ labels:
+ app: minio
+ name: minio-deploy
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: minio
+ strategy: {}
+ template:
+ metadata:
+ creationTimestamp: null
+ labels:
+ app: minio
+ spec:
+ volumes:
+ - name: hostvolume
+ hostPath:
+ path: /tmp/minikubedata
+ type: DirectoryOrCreate
+ initContainers:
+ - name: prepare
+ image: busybox:1.28
+ command: ['sh', '-c', 'mkdir -p /data/minio/ && chown 9999
/data/minio/' ]
+ volumeMounts:
+ - mountPath: /data
+ name: hostvolume
+ containers:
+ - name: minio
+ image: quay.io/minio/minio:RELEASE.2024-01-13T07-53-03Z
+ command:
+ - /bin/bash
+ - -c
+ args:
+ - minio server /data/minio --address :9000 --console-address :9090
+ volumeMounts:
+ - mountPath: /data
+ name: hostvolume
+status: {}
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app: minio
+ name: minio-svc
+spec:
+ ports:
+ - name: webconsole
+ port: 9090
+ protocol: TCP
+ targetPort: 9090
+ - name: api
+ port: 9000
+ protocol: TCP
+ targetPort: 9000
+ selector:
+ app: minio
+status:
+ loadBalancer: {}
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/pom.xml
b/hudi-examples/hudi-examples-k8s/pom.xml
new file mode 100644
index 00000000000..82870562f92
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/pom.xml
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hudi-examples</artifactId>
+ <groupId>org.apache.hudi</groupId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hudi-examples-k8s</artifactId>
+
+ <properties>
+ <main.basedir>${project.parent.basedir}</main.basedir>
+ <checkstyle.skip>true</checkstyle.skip>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven-shade-plugin.version}</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ <excludes>
+
<exclude>org.apache.hbase.thirdparty:*</exclude>
+
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+
<exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+
<exclude>org.apache.logging.log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the
META-INF folder.
+ Otherwise, this might cause
SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+\ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.hudi.examples.k8s.quickstart.HudiDataStreamWriter</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+ <!-- Hoodie -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-flink${flink.bundle.version}-bundle</artifactId>
+ <scope>compile</scope>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.streaming.java.artifactId}</artifactId>
+ <scope>provided</scope>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.clients.artifactId}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware.minlog</groupId>
+ <artifactId>minlog</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.table.runtime.artifactId}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Hadoop -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>runtime</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>runtime</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <scope>runtime</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>runtime</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <scope>runtime</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
new file mode 100644
index 00000000000..e79db5337c5
--- /dev/null
+++
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.k8s.quickstart;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.examples.k8s.quickstart.utils.DataGenerator;
+import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Flink program serves as a demonstration of inserting, updating, and
deleting records in a Hudi table using the DataStream API.
+ * The program inserts four messages for ten batches. Two of the messages
generate a random UUID, acting as new insert records, while
+ * two records reuse the same record keys, resulting in an update for those
two records in each batch.
+ * In the first batch, four new records are inserted into a newly created Hudi
table.
+ * Subsequently, after each batch, two new records are inserted, leading to an
increment in the count by two with each batch.
+ * In the 11th batch, we illustrate the delete operation by publishing a
record with the delete row kind. As a result,
+ * we observe the deletion of the third ID after this batch.
+ *
+ * After this code finishes you should see total 21 records in hudi table.
+ */
+public class HudiDataStreamWriter {
+
+ public static DataType ROW_DATA_TYPE = DataTypes.ROW(
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(40)),// record key
+ DataTypes.FIELD("rider", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("driver", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("fare", DataTypes.DOUBLE()),
+ DataTypes.FIELD("city", DataTypes.VARCHAR(20))).notNull();
+
+ /**
+ * Main Entry point takes two parameters.
+ * It can be run with Flink cli.
+ * Sample Command - bin/flink run -c
com.hudi.flink.quickstart.HudiDataStreamWriter
${HUDI_FLINK_QUICKSTART_REPO}/target/hudi-examples-0.1.jar hudi_table
"file:///tmp/hudi_table"
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Enable checkpointing
+ configureCheckpointing(env);
+
+ DataStreamSource<RowData> dataStream = env.addSource(new
SampleDataSource());
+
+ final String targetS3Path = System.getenv("TARGET_S3_PATH");
+ HoodiePipeline.Builder builder = createHudiPipeline("hudi_table",
createHudiOptions(targetS3Path));
+ builder.sink(dataStream, false);
+
+ env.execute("Api_Sink");
+ }
+
+ /**
+ * Configure Flink checkpointing settings.
+ *
+ * @param env The Flink StreamExecutionEnvironment.
+ */
+ private static void configureCheckpointing(StreamExecutionEnvironment env)
{
+ env.enableCheckpointing(5000); // Checkpoint every 5 seconds
+ CheckpointConfig checkpointConfig = env.getCheckpointConfig();
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ checkpointConfig.setMinPauseBetweenCheckpoints(10000); // Minimum time
between checkpoints
+ checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout in
milliseconds
+ }
+
+ /**
+ * Create Hudi options for the data sink.
+ *
+ * @param basePath The base path for Hudi data.
+ * @return A Map containing Hudi options.
+ */
+ private static Map<String, String> createHudiOptions(String basePath) {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), basePath);
+
options.put(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), "s3a");
+ options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+ options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ options.put(FlinkOptions.IGNORE_FAILED.key(), "true");
+ return options;
+ }
+
+ /**
+ * Create a HudiPipeline.Builder with the specified target table and
options.
+ *
+ * @param targetTable The name of the Hudi table.
+ * @param options The Hudi options for the data sink.
+ * @return A HudiPipeline.Builder.
+ */
+ private static HoodiePipeline.Builder createHudiPipeline(String
targetTable, Map<String, String> options) {
+ return HoodiePipeline.builder(targetTable)
+ .column("ts TIMESTAMP(3)")
+ .column("uuid VARCHAR(40)")
+ .column("rider VARCHAR(20)")
+ .column("driver VARCHAR(20)")
+ .column("fare DOUBLE")
+ .column("city VARCHAR(20)")
+ .pk("uuid")
+ .partition("city")
+ .options(options);
+ }
+
+ /**
+ * Sample data source for generating RowData objects.
+ */
+ static class SampleDataSource implements SourceFunction<RowData> {
+ private volatile boolean isRunning = true;
+
+ @Override
+ public void run(SourceContext<RowData> ctx) throws Exception {
+ int batchNum = 0;
+ while (isRunning) {
+ batchNum ++;
+ List<RowData> DATA_SET_INSERT =
DataGenerator.generateRandomRowData(ROW_DATA_TYPE);
+ if(batchNum < 11) {
+ // For first 10 batches, inserting 4 records. 2 with
random id (INSERTS) and 2 with hardcoded UUID(UPDATE)
+ for (RowData row : DATA_SET_INSERT) {
+ ctx.collect(row);
+ }
+ }else{
+ // For 11th Batch, inserting only one record with row kind
delete.
+ RowData rowToBeDeleted = DATA_SET_INSERT.get(2);
+ rowToBeDeleted.setRowKind(RowKind.DELETE);
+ ctx.collect(rowToBeDeleted);
+ TimeUnit.MILLISECONDS.sleep(10000);
+ // Stop the stream once deleted
+ isRunning = false;
+ }
+ TimeUnit.MILLISECONDS.sleep(10000); // Simulate a delay
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+}
diff --git
a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
new file mode 100644
index 00000000000..97bbeb9bbe5
--- /dev/null
+++
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.k8s.quickstart.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Util class for testing data generation.
+ */
+public class DataGenerator {
+
+ public static List<RowData> generateRandomRowData(DataType dataType) {
+
+ // For Every Batch, it adds two new rows with RANDOM uuid and updates
the row with uuid
+ // "334e26e9-8355-45cc-97c6-c31daf0df330" and
"7fd3fd07-cf04-4a1d-9511-142736932983"
+ return Arrays.asList(
+ DataGenerator.createRowData(dataType,
TimestampData.fromEpochMillis(System.currentTimeMillis()),
+ StringData.fromString(UUID.randomUUID().toString()),
StringData.fromString("rider-A"),
+ StringData.fromString("driver-K"), 1.0 + Math.random()
* (90), StringData.fromString("san_francisco")),
+
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+ StringData.fromString(UUID.randomUUID().toString()),
StringData.fromString("rider-B"),
+ StringData.fromString("driver-M"), 1.0 + Math.random()
* (90), StringData.fromString("brazil")),
+
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+
StringData.fromString("334e26e9-8355-45cc-97c6-c31daf0df330"),
StringData.fromString("rider-C"),
+ StringData.fromString("driver-L"), 15.4,
StringData.fromString("chennai")),
+
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+
StringData.fromString("7fd3fd07-cf04-4a1d-9511-142736932983"),
StringData.fromString("rider-D"),
+ StringData.fromString("driver-N"), 1.0 + Math.random()
* (90), StringData.fromString("london"))
+ );
+ }
+
+ public static BinaryRowData createRowData(DataType dataType, Object...
fields) {
+ RowType ROW_TYPE = (RowType) dataType.getLogicalType();
+ LogicalType[] types =
ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ BinaryRowData row = new BinaryRowData(fields.length);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.reset();
+ for (int i = 0; i < fields.length; i++) {
+ Object field = fields[i];
+ if (field == null) {
+ writer.setNullAt(i);
+ } else {
+ BinaryWriter.write(writer, i, field, types[i],
InternalSerializers.create(types[i]));
+ }
+ }
+ writer.complete();
+ return row;
+ }
+}
diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml
index 6e890f5c34d..94f17f005fe 100644
--- a/hudi-examples/pom.xml
+++ b/hudi-examples/pom.xml
@@ -32,6 +32,7 @@
<module>hudi-examples-spark</module>
<module>hudi-examples-flink</module>
<module>hudi-examples-java</module>
+ <module>hudi-examples-k8s</module>
</modules>
</project>