This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch flink-replication
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d7d80687ec9214500ace5463756627153ae8e7cb
Author: zchovan <[email protected]>
AuthorDate: Tue Jan 7 13:52:09 2025 +0100

    Replace Apache Bahir with flink-connector
    
    This commit replaces the Apache Bahir dependency with the Flink Kudu
    connector.
    
    Change-Id: Ia1b8d1fb504908d2fc1f693fd3a08a578d72cabe
---
 java/build.gradle                                  |  5 +++++
 java/gradle/dependencies.gradle                    |  4 ++--
 .../apache/kudu/replication/ReplicationJob.java    |  4 ++++
 ...eplicationHarness.java => TestReplication.java} | 25 ++++++++++++++--------
 4 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/java/build.gradle b/java/build.gradle
index ea8f68930..a8eb67c55 100755
--- a/java/build.gradle
+++ b/java/build.gradle
@@ -36,6 +36,11 @@ allprojects {
   repositories {
     mavenCentral()
     mavenLocal()
+    // TODO(zchovan): remove this, once the kudu connector is published 
upstream
+    // temporarily add downstream nexus
+    maven() {
+      url = uri("https://nexus-private.infra.cloudera.com/nexus/";)
+    }
   }
 
   // Read the version.txt file to set the project version
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 354dad276..688bdf8a2 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -26,7 +26,7 @@ ext {
 
 versions += [
     async          : "1.4.1",
-    bahir          : "1.1.0",
+    bahir          : "2.0-SNAPSHOT",
     checkstyle     : "8.36.1",
     clojure        : "1.10.3",
     clojureToolsCli: "1.0.206",
@@ -97,7 +97,7 @@ libs += [
     flinkConnectorBase   : 
"org.apache.flink:flink-connector-base:$versions.flink",
     flinkCore            : "org.apache.flink:flink-core:$versions.flink",
     flinkDist            : "org.apache.flink:flink-dist:$versions.flink",
-    flinkKuduConnector   : 
"org.apache.bahir:flink-connector-kudu_2.11:$versions.bahir",
+    flinkKuduConnector   : 
"org.apache.flink:flink-connector-kudu:$versions.bahir",
     flinkStreamingJava   : 
"org.apache.flink:flink-streaming-java:$versions.flink",
     guava                : "com.google.guava:guava:$versions.guava",
     hadoopClient         : "org.apache.hadoop:hadoop-client:$versions.hadoop",
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
index 0cb08e181..1c83fd9b2 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJob.java
@@ -22,10 +22,14 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import java.util.ArrayList;
 import java.util.Collections;
 
+/**
+ * This class is used to submit Kudu Replication Jobs into a Flink Cluster.
+ */
 public class ReplicationJob {
     static ReplicationJobConfig config;
 
     public static void main(String[] args) throws Exception {
+        // Read parameters from standard args, then convert them into a 
ReplicationJobConfig object.
         ParameterTool parameters = ParameterTool.fromArgs(args);
 
         config.setSourceMasterAddresses(new ArrayList<>(
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
similarity index 69%
rename from 
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
rename to 
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
index 8ee06510a..d62a9209d 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationHarness.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
@@ -1,7 +1,5 @@
 package org.apache.kudu.replication;
 
-import org.apache.kudu.Schema;
-import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Before;
@@ -12,11 +10,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import static 
org.apache.kudu.test.ClientTestUtil.createTableWithOneThousandRows;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-public class TestReplicationHarness {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationHarness.class);
+public class TestReplication {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestReplication.class);
     private static final String TABLE_NAME = "replication_test_table";
 
     @Rule
@@ -29,16 +30,22 @@ public class TestReplicationHarness {
 
     @Before
     public void setup()  {
-        CreateTableOptions builder = new 
CreateTableOptions().setNumReplicas(3);
-        Schema basicSchema = getBasicSchema();
-
         this.sourceClient = sourceHarness.getClient();
         this.sinkClient = sinkHarness.getClient();
-
     }
 
     @Test
-    public void BenchmarkReplicationScenario() {
+    public void TestBasicReplication() {
+        //Setup source table
+        try {
+            createTableWithOneThousandRows(
+                    this.sourceHarness.getAsyncClient(), TABLE_NAME, 32 * 
1024, DEFAULT_SLEEP);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error(e.getMessage());
+            fail(e.getMessage());
+        }
+
         ReplicationJobConfig config = new ReplicationJobConfig();
 
         
config.setSourceMasterAddresses(Arrays.asList(sourceHarness.getMasterAddressesAsString().split(",")));

Reply via email to