This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 44fc8f31ba28af7c59e9fdae37a27e5d6d9f92e5
Author: zchovan <[email protected]>
AuthorDate: Mon Nov 18 15:19:13 2024 +0100

    [java] init kudu-replication subproject
    
    This change introduces a new gradle subproject 'kudu-replication'. This
    subproject contain the new Kudu replication Flink job and it's related
    resources/dependencies.
    
    In this commit:
    * new Java dependencies were added, for Flink and it's Kudu connector:
        * org.apache.flink:flink-clients
        * org.apache.flink:flink-connector-base
        * org.apache.flink:flink-core
        * org.apache.flink:flink-dist
        * org.apache.bahir:flink-connector-kudu_2.11
        * org.apache.flink:flink-streaming-java
    * created a new Java class 'ReplicationJob' that is executable by Flink
    * created a new configuratin class for the ReplicationJob
    * created an initial test setup for replication
    
    Change-Id: Idf2dfee6e6458aa4a559698792e6a1139e67f338
---
 java/gradle/dependencies.gradle                    |  10 ++
 .../build.gradle}                                  |  33 +++---
 .../org/apache/kudu/replication/KuduSource.java    | 127 +++++++++++++++++++++
 .../apache/kudu/replication/ReplicationJob.java    |  41 +++++++
 .../kudu/replication/ReplicationJobConfig.java     |  50 ++++++++
 .../kudu/replication/ReplicationJobExecutor.java}  |  39 ++++---
 .../kudu/replication/TestReplicationHarness.java   |  54 +++++++++
 .../src/test/resources/log4j2.properties           |  32 ++++++
 java/settings.gradle                               |   1 +
 9 files changed, 359 insertions(+), 28 deletions(-)

diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index d0015dd4b..354dad276 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -26,12 +26,14 @@ ext {
 
 versions += [
     async          : "1.4.1",
+    bahir          : "1.1.0",
     checkstyle     : "8.36.1",
     clojure        : "1.10.3",
     clojureToolsCli: "1.0.206",
     commonsIo      : "2.11.0",
     errorProne     : "2.3.3",
     errorProneJavac: "9+181-r4173-1",
+    flink          : "1.19.1",
     gradle         : "7.6.4",
     guava          : "32.1.1-jre",
     hadoop         : "3.3.1",
@@ -62,6 +64,7 @@ versions += [
     spark2         : "2.4.8",
     spark          : "3.2.4",
     spotBugs       : "4.1.1",
+    testcontainers : "1.20.3",
     yetus          : "0.13.0"
 ]
 
@@ -90,6 +93,12 @@ libs += [
     commonsIo            : "commons-io:commons-io:$versions.commonsIo",
     errorProne           : 
"com.google.errorprone:error_prone_core:$versions.errorProne",
     errorProneJavac      : 
"com.google.errorprone:javac:$versions.errorProneJavac",
+    flinkClients         : "org.apache.flink:flink-clients:$versions.flink",
+    flinkConnectorBase   : 
"org.apache.flink:flink-connector-base:$versions.flink",
+    flinkCore            : "org.apache.flink:flink-core:$versions.flink",
+    flinkDist            : "org.apache.flink:flink-dist:$versions.flink",
+    flinkKuduConnector   : 
"org.apache.bahir:flink-connector-kudu_2.11:$versions.bahir",
+    flinkStreamingJava   : 
"org.apache.flink:flink-streaming-java:$versions.flink",
     guava                : "com.google.guava:guava:$versions.guava",
     hadoopClient         : "org.apache.hadoop:hadoop-client:$versions.hadoop",
     hadoopCommon         : "org.apache.hadoop:hadoop-common:$versions.hadoop",
@@ -138,5 +147,6 @@ libs += [
     sparkCore            : 
"org.apache.spark:spark-core_$versions.scalaBase:$versions.spark",
     sparkSql             : 
"org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark",
     sparkSqlTest         : 
"org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests",
+    testcontainers       : 
"org.testcontainers:testcontainers:$versions.testcontainers",
     yetusAnnotations     : 
"org.apache.yetus:audience-annotations:$versions.yetus"
 ]
diff --git a/java/settings.gradle b/java/kudu-replication/build.gradle
similarity index 53%
copy from java/settings.gradle
copy to java/kudu-replication/build.gradle
index ebe587ab8..9ba64bbc8 100644
--- a/java/settings.gradle
+++ b/java/kudu-replication/build.gradle
@@ -15,18 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// This file contains the configuration of the project hierarchy.
-// Mainly we just define what subprojects are in the build.
+apply from: "$rootDir/gradle/shadow.gradle"
 
-rootProject.name = "kudu-parent"
-include "kudu-backup"
-include "kudu-backup-common"
-include "kudu-backup-tools"
-include "kudu-client"
-include "kudu-hive"
-include "kudu-jepsen"
-include "kudu-proto"
-include "kudu-spark"
-include "kudu-spark-tools"
-include "kudu-subprocess"
-include "kudu-test-utils"
+dependencies {
+    implementation libs.flinkKuduConnector
+    implementation libs.flinkCore
+    implementation libs.flinkStreamingJava
+    implementation libs.flinkClients
+    implementation libs.flinkConnectorBase
+
+    implementation project(path: ":kudu-client", configuration: "shadow")
+
+    testImplementation libs.junit
+    testImplementation libs.log4jApi
+    testImplementation libs.log4jCore
+    testImplementation libs.log4jSlf4jImpl
+    testImplementation libs.testcontainers
+    testImplementation project(path: ":kudu-test-utils", configuration: 
"shadow")
+    testImplementation libs.jmhCore
+    testImplementation libs.jmhGenerator
+}
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
new file mode 100644
index 000000000..eeb616903
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/KuduSource.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.replication;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.kudu.client.*;
+import org.apache.kudu.util.HybridTimeUtil;
+
+public class KuduSource extends RichSourceFunction<String> {
+    private final List<String> kuduMasters;
+    private final String tableName;
+    private transient KuduClient client;
+    private volatile boolean isRunning = true;
+    private final long scanIntervalMillis = 5000; // Set your desired scan 
interval
+    private long lastTimestamp = 0; // Track the last timestamp
+    private boolean isFirstScan = true; // Track if it's the first scan
+
+    public KuduSource(List<String> masterAddresses, String tableName) {
+        this.kuduMasters = masterAddresses;
+        this.tableName = tableName;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        try {
+            client = new KuduClient.KuduClientBuilder(kuduMasters).build();
+            System.out.println("Kudu client initialized.");
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("Failed to initialize Kudu client.", e);
+        }
+    }
+
+    @Override
+    public void run(SourceContext<String> ctx) throws Exception {
+        while (isRunning) {
+            KuduTable table = client.openTable(tableName);
+            long currentMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+            long currentTimestamp = 
HybridTimeUtil.physicalAndLogicalToHTTimestamp(currentMicros, 0) + 1;
+            List<KuduScanToken> tokens;
+            if (isFirstScan) {
+                // Perform the initial snapshot scan
+                tokens = client.newScanTokenBuilder(table)
+                        .snapshotTimestampRaw(currentTimestamp)
+                        .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+                        .build();
+                for (KuduScanToken token : tokens) {
+                    KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(token.serialize(), client);
+                    while (scanner.hasMoreRows()) {
+                        RowResultIterator results = scanner.nextRows();
+                        while (results.hasNext()) {
+                            RowResult result = results.next();
+                            String rowData = result.rowToString();
+                            System.out.println("Fetched row: " + rowData);
+                            synchronized (ctx.getCheckpointLock()) {
+                                ctx.collect(rowData);
+                            }
+                        }
+                    }
+                }
+                isFirstScan = false; // Mark the first scan as complete
+            } else {
+                // Perform the timestamp-based diff scan
+                tokens = client.newScanTokenBuilder(table)
+                        .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+                        .diffScan(lastTimestamp, currentTimestamp)
+                        .build();
+                for (KuduScanToken token : tokens) {
+                    KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(token.serialize(), client);
+                    while (scanner.hasMoreRows()) {
+                        RowResultIterator results = scanner.nextRows();
+                        while (results.hasNext()) {
+                            RowResult result = results.next();
+                            String rowData = result.rowToString();
+                            System.out.println("Fetched row: " + rowData);
+                            synchronized (ctx.getCheckpointLock()) {
+                                ctx.collect(rowData);
+                            }
+                        }
+                    }
+                }
+            }
+            // Update the last timestamp for the next diff scan
+            lastTimestamp = currentTimestamp;
+            // Sleep for the defined interval before the next scan
+            Thread.sleep(scanIntervalMillis);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+        if (client != null) {
+            try {
+                client.close();
+            } catch (KuduException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+    }
+}
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
new file mode 100644
index 000000000..0cb08e181
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.replication;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class ReplicationJob {
+    static ReplicationJobConfig config;
+
+    public static void main(String[] args) throws Exception {
+        ParameterTool parameters = ParameterTool.fromArgs(args);
+
+        config.setSourceMasterAddresses(new ArrayList<>(
+                
Collections.singletonList(parameters.get("sourceMasterAddresses", 
"127.0.0.1:8764"))));
+        config.setSinkMasterAddresses(new ArrayList<>(
+                
Collections.singletonList(parameters.get("sinkMasterAddresses", 
"127.0.0.1:8764"))));
+        config.setTableName(parameters.get("tableName", "test_table"));
+
+        ReplicationJobExecutor replicationJobExecutor = new 
ReplicationJobExecutor(config);
+        replicationJobExecutor.runJob();
+    }
+}
+
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
new file mode 100644
index 000000000..52c644b60
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.replication;
+
+import java.util.List;
+
+public class ReplicationJobConfig {
+    private List<String> sourceMasterAddresses;
+    private List<String> sinkMasterAddresses;
+    private String tableName;
+
+    public List<String> getSourceMasterAddresses() {
+        return sourceMasterAddresses;
+    }
+
+    public void setSourceMasterAddresses(List<String> sourceMasterAddresses) {
+        this.sourceMasterAddresses = sourceMasterAddresses;
+    }
+
+    public List<String> getSinkMasterAddresses() {
+        return sinkMasterAddresses;
+    }
+
+    public void setSinkMasterAddresses(List<String> sinkMasterAddresses) {
+        this.sinkMasterAddresses = sinkMasterAddresses;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+}
diff --git a/java/settings.gradle 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
similarity index 50%
copy from java/settings.gradle
copy to 
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
index ebe587ab8..6ad63ae09 100644
--- a/java/settings.gradle
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobExecutor.java
@@ -15,18 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// This file contains the configuration of the project hierarchy.
-// Mainly we just define what subprojects are in the build.
+package org.apache.kudu.replication;
 
-rootProject.name = "kudu-parent"
-include "kudu-backup"
-include "kudu-backup-common"
-include "kudu-backup-tools"
-include "kudu-client"
-include "kudu-hive"
-include "kudu-jepsen"
-include "kudu-proto"
-include "kudu-spark"
-include "kudu-spark-tools"
-include "kudu-subprocess"
-include "kudu-test-utils"
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+class ReplicationJobExecutor {
+    ReplicationJobConfig config;
+
+    ReplicationJobExecutor(ReplicationJobConfig config) {
+        this.config = config;
+    }
+
+    public void runJob() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> kuduSourceStream = env.addSource(
+                new KuduSource(config.getSourceMasterAddresses(), 
config.getTableName()));
+
+        kuduSourceStream.print();
+
+        try {
+            env.execute("Kudu Source Job");
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+        }
+    }
+}
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
new file mode 100644
index 000000000..8ee06510a
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
@@ -0,0 +1,54 @@
+package org.apache.kudu.replication;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationHarness {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationHarness.class);
+    private static final String TABLE_NAME = "replication_test_table";
+
+    @Rule
+    public final KuduTestHarness sourceHarness = new KuduTestHarness();
+    @Rule
+    public final KuduTestHarness sinkHarness = new KuduTestHarness();
+
+    private KuduClient sourceClient;
+    private KuduClient sinkClient;
+
+    @Before
+    public void setup()  {
+        CreateTableOptions builder = new 
CreateTableOptions().setNumReplicas(3);
+        Schema basicSchema = getBasicSchema();
+
+        this.sourceClient = sourceHarness.getClient();
+        this.sinkClient = sinkHarness.getClient();
+
+    }
+
+    @Test
+    public void BenchmarkReplicationScenario() {
+        ReplicationJobConfig config = new ReplicationJobConfig();
+
+        
config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(",")));
+        
config.setSinkMasterAddresses(Arrays.asList(sinkHarness.getMasterAddressesAsString().split(",")));
+        config.setTableName(TABLE_NAME);
+
+        ReplicationJobExecutor executor = new ReplicationJobExecutor(config);
+        executor.runJob();
+
+        assertTrue(true);
+    }
+
+}
diff --git a/java/kudu-replication/src/test/resources/log4j2.properties 
b/java/kudu-replication/src/test/resources/log4j2.properties
new file mode 100644
index 000000000..e85deec99
--- /dev/null
+++ b/java/kudu-replication/src/test/resources/log4j2.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+status = error
+name = PropertiesConfig
+appenders = console
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+rootLogger.level = info
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
+
+logger.kudu.name = org.apache.kudu
+logger.kudu.level = debug
\ No newline at end of file
diff --git a/java/settings.gradle b/java/settings.gradle
index ebe587ab8..7b62d7b1d 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -26,6 +26,7 @@ include "kudu-client"
 include "kudu-hive"
 include "kudu-jepsen"
 include "kudu-proto"
+include "kudu-replication"
 include "kudu-spark"
 include "kudu-spark-tools"
 include "kudu-subprocess"

Reply via email to