This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c43fdba5 KUDU-3662 [3/n] Setup source and sink and tests
4c43fdba5 is described below

commit 4c43fdba56d453a13a15cd86c8c3ea7596d073dd
Author: Zoltan Chovan <zcho...@cloudera.com>
AuthorDate: Tue May 27 19:09:53 2025 +0200

    KUDU-3662 [3/n] Setup source and sink and tests
    
    - Introduce CustomReplicationRowResultConverter to detect delete
      rows on the source side
    - Add CustomReplicationOperationMapper to map Flink Row to Kudu
      upsert/delete operations
    - Wire source and sink using the above in ReplicationEnvProvider
    - Set up ReplicationTestBase with separate source and sink Kudu
      test harnesses to test replication logic
    - Add basic replication tests verifying row count and content equality
    
    Change-Id: I3dd7122933b125b6d4c6dd5dde27a7e8c3f790d4
    Reviewed-on: http://gerrit.cloudera.org:8080/23003
    Tested-by: Kudu Jenkins
    Reviewed-by: Marton Greber <greber...@gmail.com>
    Reviewed-by: Zoltan Martonka <zmarto...@cloudera.com>
---
 java/config/spotbugs/excludeFilter.xml             |   6 -
 java/gradle/dependencies.gradle                    |  89 +++++++------
 java/kudu-replication/build.gradle                 |  10 ++
 .../CustomReplicationOperationMapper.java          |  61 +++++++++
 .../CustomReplicationRowRestultConverter.java      |  78 ++++++++++++
 .../kudu/replication/ReplicationEnvProvider.java   |  35 ++++++
 .../kudu/replication/ReplicationTestBase.java      | 140 +++++++++++++++++++++
 .../apache/kudu/replication/TestReplication.java   |  72 +++++++++++
 .../src/test/resources/log4j2.properties           |  32 +++++
 .../java/org/apache/kudu/test/ClientTestUtil.java  |  19 ++-
 10 files changed, 490 insertions(+), 52 deletions(-)

diff --git a/java/config/spotbugs/excludeFilter.xml 
b/java/config/spotbugs/excludeFilter.xml
index 90080e9cb..11da3f997 100644
--- a/java/config/spotbugs/excludeFilter.xml
+++ b/java/config/spotbugs/excludeFilter.xml
@@ -375,10 +375,4 @@
         <Class name="org.apache.kudu.test.cluster.FakeDNS"/>
         <Bug pattern="DP_DO_INSIDE_DO_PRIVILEGED" />
     </Match>
-
-    <Match>
-        <!-- TODO: Remove this in follow-up commit. -->
-        <Class name="org.apache.kudu.replication.ReplicationEnvProvider" />
-        <Bug pattern="URF_UNREAD_FIELD" />
-    </Match>
 </FindBugsFilter>
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 988ae7ea3..cb6d23f7b 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -25,46 +25,47 @@ ext {
 }
 
 versions += [
-    assertj        : "3.27.3",
-    async          : "1.4.1",
-    checkstyle     : "8.36.1",
-    clojure        : "1.12.0",
-    clojureToolsCli: "1.1.230",
-    commonsIo      : "2.15.0",
-    errorProne     : "2.3.3",
-    errorProneJavac: "9+181-r4173-1",
-    flink          : "1.19.2",
-    gradle         : "7.6.4",
-    guava          : "33.3.1-jre",
-    hadoop         : "3.4.1",
-    hamcrest       : "3.0",
-    hdrhistogram   : "2.2.2",
-    hive           : "3.1.2",
-    httpClient     : "4.5.14",
-    jacoco         : "0.8.6",
-    jepsen         : "0.1.5",
-    jetty          : "9.4.57.v20241219",
-    jmh            : "1.37",
-    jsr305         : "3.0.2",
-    junit          : "4.13.2",
-    log4j          : "2.24.3",
-    logCaptor      : "2.10.0",
-    micrometer     : "1.8.2",
-    mockito        : "4.2.0",
-    murmur         : "1.0.0",
-    netty          : "4.1.115.Final",
-    osdetector     : "1.6.2",
-    protobuf       : "3.25.5",
-    ranger         : "2.1.0",
-    scala211       : "2.11.12",
-    scala          : "2.12.20",
-    scalatest      : "3.2.19",
-    scopt          : "4.1.0",
-    slf4j          : "1.7.36",
-    spark2         : "2.4.8",
-    spark          : "3.2.4",
-    spotBugs       : "4.1.1",
-    yetus          : "0.13.0"
+    assertj             : "3.27.3",
+    async               : "1.4.1",
+    checkstyle          : "8.36.1",
+    clojure             : "1.12.0",
+    clojureToolsCli     : "1.1.230",
+    commonsIo           : "2.15.0",
+    errorProne          : "2.3.3",
+    errorProneJavac     : "9+181-r4173-1",
+    flinkConnectorKudu  : "2.0.0-1.19",
+    flink               : "1.19.2",
+    gradle              : "7.6.4",
+    guava               : "33.3.1-jre",
+    hadoop              : "3.4.1",
+    hamcrest            : "3.0",
+    hdrhistogram        : "2.2.2",
+    hive                : "3.1.2",
+    httpClient          : "4.5.14",
+    jacoco              : "0.8.6",
+    jepsen              : "0.1.5",
+    jetty               : "9.4.57.v20241219",
+    jmh                 : "1.37",
+    jsr305              : "3.0.2",
+    junit               : "4.13.2",
+    log4j               : "2.24.3",
+    logCaptor           : "2.10.0",
+    micrometer          : "1.8.2",
+    mockito             : "4.2.0",
+    murmur              : "1.0.0",
+    netty               : "4.1.115.Final",
+    osdetector          : "1.6.2",
+    protobuf            : "3.25.5",
+    ranger              : "2.1.0",
+    scala211            : "2.11.12",
+    scala               : "2.12.20",
+    scalatest           : "3.2.19",
+    scopt               : "4.1.0",
+    slf4j               : "1.7.36",
+    spark2              : "2.4.8",
+    spark               : "3.2.4",
+    spotBugs            : "4.1.1",
+    yetus               : "0.13.0"
 ]
 
 // Log the Gradle version used vs defined.
@@ -93,6 +94,14 @@ libs += [
     commonsIo            : "commons-io:commons-io:$versions.commonsIo",
     errorProne           : 
"com.google.errorprone:error_prone_core:$versions.errorProne",
     errorProneJavac      : 
"com.google.errorprone:javac:$versions.errorProneJavac",
+    flinkClients         : "org.apache.flink:flink-clients:$versions.flink",
+    flinkConnectorBase   : 
"org.apache.flink:flink-connector-base:$versions.flink",
+    flinkCore            : "org.apache.flink:flink-core:$versions.flink",
+    flinkApiCommon       : "org.apache.flink:flink-api-common:$versions.flink",
+    flinkDist            : "org.apache.flink:flink-dist:$versions.flink",
+    flinkStreamingJava   : 
"org.apache.flink:flink-streaming-java:$versions.flink",
+    flinkConnectorKudu   : 
"org.apache.flink:flink-connector-kudu:$versions.flinkConnectorKudu",
+    flinkTestUtils       : "org.apache.flink:flink-test-utils:$versions.flink",
     flinkStreamingJava   : 
"org.apache.flink:flink-streaming-java:$versions.flink",
     guava                : "com.google.guava:guava:$versions.guava",
     hadoopClient         : "org.apache.hadoop:hadoop-client:$versions.hadoop",
diff --git a/java/kudu-replication/build.gradle 
b/java/kudu-replication/build.gradle
index 6c1c775b6..d2f5f2e43 100644
--- a/java/kudu-replication/build.gradle
+++ b/java/kudu-replication/build.gradle
@@ -20,9 +20,19 @@ apply from: "$rootDir/gradle/shadow.gradle"
 dependencies {
     // todo(zchovan) confirm if any dependencies should be shaded to avoid 
classpath issues
     implementation libs.flinkStreamingJava
+    implementation libs.flinkConnectorKudu
+    implementation libs.flinkCore
+    implementation libs.flinkClients
+    implementation libs.flinkConnectorBase
+    implementation project(path: ":kudu-client", configuration: "shadow")
 
     testImplementation libs.junit
     testImplementation libs.assertj
+    testImplementation libs.junit
+    testImplementation project(path: ":kudu-test-utils", configuration: 
"shadow")
+    testImplementation libs.log4jApi
+    testImplementation libs.log4jCore
+    testImplementation libs.log4jSlf4jImpl
 }
 
 // kudu-replication has no public Javadoc.
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
new file mode 100644
index 000000000..b5e44d78d
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * A custom implementation of {@link KuduOperationMapper} that maps a Flink 
{@link Row}
+ * to the appropriate Kudu {@link Operation} based on the row's kind.
+ * <p>
+ * If the row has kind {@link RowKind#DELETE}, a Kudu {@code DeleteIgnore} 
operation is created.
+ * Otherwise, a {@code UpsertIgnore} operation is used by default.
+ * </p>
+ * <p>
+ * All fields in the Flink row are written into the Kudu {@link PartialRow}.
+ * </p>
+ */
+public class CustomReplicationOperationMapper implements 
KuduOperationMapper<Row> {
+  private static final long serialVersionUID = 3364944402413430711L;
+
+  public Operation createBaseOperation(Row input, KuduTable table) {
+    RowKind kind = input.getKind();
+    if (kind == RowKind.DELETE) {
+      return table.newDeleteIgnore();
+    } else {
+      return table.newUpsertIgnore();
+    }
+  }
+
+  @Override
+  public List<Operation> createOperations(Row input, KuduTable table) {
+    Operation operation = createBaseOperation(input, table);
+    PartialRow partialRow = operation.getRow();
+    for (int i = 0; i < input.getArity(); i++) {
+      partialRow.addObject(i, input.getField(i));
+    }
+    return Collections.singletonList(operation);
+  }
+}
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
new file mode 100644
index 000000000..bced43532
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+ * TODO(mgreber): remove this file once FLINK-37473 is resolved.
+ *
+ * This class should live in the official Flink Kudu source connector.
+ * Until then, it's included here as a temporary workaround in the replication 
job source.
+ */
+
+package org.apache.kudu.replication;
+
+import org.apache.flink.connector.kudu.connector.converter.RowResultConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * A custom implementation of {@link RowResultConverter} that converts a Kudu 
{@link RowResult}
+ * into a Flink {@link Row}.
+ */
+public class CustomReplicationRowRestultConverter implements 
RowResultConverter<Row> {
+  private static final long serialVersionUID = -2668642733689083069L;
+
+  public CustomReplicationRowRestultConverter() {
+  }
+
+  /**
+   * If the Kudu schema has a delete marker column (as indicated by {@link 
Schema#hasIsDeleted()}),
+   * and the row is marked as deleted, the resulting Flink row's kind is set to
+   * {@link RowKind#DELETE}.
+   *
+   * The special delete marker column (accessible via {@link 
Schema#getIsDeletedIndex()}) is
+   * excluded from the Flink row’s fields. All other non-null fields are 
copied into the Flink
+   * {@code Row}.
+   *
+   * @param row the Kudu {@code RowResult} to convert
+   * @return a Flink {@code Row} representation, with deletion semantics if 
applicable
+   */
+  @Override
+  public Row convert(RowResult row) {
+    Schema schema = row.getColumnProjection();
+    int columnCount = schema.hasIsDeleted() ? schema.getColumnCount() - 1 : 
schema.getColumnCount();
+    Row values = new Row(columnCount);
+
+    int isDeletedIndex = schema.hasIsDeleted() ? schema.getIsDeletedIndex() : 
-1;
+
+    if (row.hasIsDeleted() && row.isDeleted()) {
+      values.setKind(RowKind.DELETE);
+    }
+    schema.getColumns().forEach(column -> {
+      int pos = schema.getColumnIndex(column.getName());
+      // Skip the isDeleted column if it exists
+      if (pos == isDeletedIndex && row.hasIsDeleted()) {
+        return;
+      }
+      if (!row.isNull(pos)) {
+        values.setField(pos, row.getObject(pos));
+      }
+    });
+
+    return values;
+  }
+}
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
index 118526bcd..4c874bf05 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
@@ -15,7 +15,19 @@
 
 package org.apache.kudu.replication;
 
+import java.time.Duration;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connector.kudu.sink.KuduSink;
+import org.apache.flink.connector.kudu.sink.KuduSinkBuilder;
+import org.apache.flink.connector.kudu.source.KuduSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
 
 /**
  * Provides a configured {@link StreamExecutionEnvironment} for the 
replication job.
@@ -36,6 +48,29 @@ public class ReplicationEnvProvider {
    */
   public StreamExecutionEnvironment getEnv() throws Exception {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    // TODO(mgreber): implement and use reader config parsed from the command 
line.
+    KuduSource<Row> kuduSource = KuduSource.<Row>builder()
+            .setReaderConfig(KuduReaderConfig.Builder
+                  .setMasters(jobConfig.getSourceMasterAddresses()).build())
+            .setTableInfo(KuduTableInfo.forTable(jobConfig.getTableName()))
+            .setRowResultConverter(new CustomReplicationRowRestultConverter())
+            .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
+            
.setDiscoveryPeriod(Duration.ofSeconds(jobConfig.getDiscoveryIntervalSeconds()))
+            .build();
+
+    // TODO(mgreber): implement and use writer config parsed from command line.
+    KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>()
+            .setWriterConfig(KuduWriterConfig.Builder
+                    .setMasters(jobConfig.getSinkMasterAddresses()).build())
+            .setTableInfo(KuduTableInfo.forTable(jobConfig.getSinkTableName()))
+            .setOperationMapper(new CustomReplicationOperationMapper())
+            .build();
+
+    env.fromSource(kuduSource, WatermarkStrategy.noWatermarks(), "KuduSource")
+            .returns(TypeInformation.of(Row.class))
+            .sinkTo(kuduSink);
+
     return env;
   }
 }
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
new file mode 100644
index 000000000..4bf7ca384
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static 
org.apache.kudu.test.ClientTestUtil.getAllTypesCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getPartialRowWithAllTypes;
+import static org.apache.kudu.test.ClientTestUtil.getSchemaWithAllTypes;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Rule;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.test.KuduTestHarness;
+
+public class ReplicationTestBase {
+  protected static final String TABLE_NAME = "replication_test_table";
+
+  @Rule
+  public final KuduTestHarness sourceHarness = new KuduTestHarness();
+  @Rule
+  public final KuduTestHarness sinkHarness = new KuduTestHarness();
+
+  protected KuduClient sourceClient;
+  protected KuduClient sinkClient;
+
+  @Before
+  public void setupClients() {
+    this.sourceClient = sourceHarness.getClient();
+    this.sinkClient = sinkHarness.getClient();
+  }
+
+
+  protected ReplicationJobConfig createDefaultJobConfig() {
+    ReplicationJobConfig jobConfig = ReplicationJobConfig.builder()
+            
.setSourceMasterAddresses(sourceHarness.getMasterAddressesAsString())
+            .setSinkMasterAddresses(sinkHarness.getMasterAddressesAsString())
+            .setTableName(TABLE_NAME)
+            .setDiscoveryIntervalSeconds(2)
+            .build();
+    return jobConfig;
+  }
+
+  protected void createAllTypesTable(KuduClient client) throws Exception {
+    Schema schema = getSchemaWithAllTypes();
+    CreateTableOptions options = getAllTypesCreateTableOptions();
+    client.createTable(TABLE_NAME, schema, options);
+  }
+
+  protected void insertRowsIntoAllTypesTable(
+          KuduClient client, int startKey, int count) throws Exception {
+    KuduTable table = client.openTable(TABLE_NAME);
+    KuduSession session = client.newSession();
+    for (int i = 0; i < count; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      getPartialRowWithAllTypes(row, (byte) (startKey + i));
+      session.apply(insert);
+    }
+    session.flush();
+    session.close();
+  }
+
+  protected void verifySourceAndSinkRowsEqual(int expectedRowCount) throws 
Exception {
+    Map<Byte, RowResult> sourceMap = buildRowMapByInt8Key(sourceClient);
+    Map<Byte, RowResult> sinkMap = buildRowMapByInt8Key(sinkClient);
+
+    for (int i = 0; i < expectedRowCount; i++) {
+      byte key = (byte) i;
+
+      RowResult sourceRow = sourceMap.get(key);
+      RowResult sinkRow = sinkMap.get(key);
+
+      if (sourceRow == null || sinkRow == null) {
+        throw new AssertionError("Missing row with int8 = " + key +
+                "\nSource has: " + (sourceRow != null) +
+                "\nSink has: " + (sinkRow != null));
+      }
+
+      compareRowResults(sourceRow, sinkRow, key);
+    }
+  }
+
+  private Map<Byte, RowResult> buildRowMapByInt8Key(KuduClient client) throws 
Exception {
+    Map<Byte, RowResult> map = new HashMap<>();
+    KuduTable table = client.openTable(TABLE_NAME);
+    KuduScanner scanner = client.newScannerBuilder(table).build();
+
+    while (scanner.hasMoreRows()) {
+      RowResultIterator it = scanner.nextRows();
+      while (it.hasNext()) {
+        RowResult row = it.next();
+        byte key = row.getByte("int8");
+        map.put(key, row);
+      }
+    }
+
+    return map;
+  }
+
+  private void compareRowResults(RowResult a, RowResult b, byte key) {
+    Schema schema = a.getSchema();
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+      String colName = schema.getColumnByIndex(i).getName();
+
+      Object valA = a.isNull(i) ? null : a.getObject(i);
+      Object valB = b.isNull(i) ? null : b.getObject(i);
+
+      if (!java.util.Objects.deepEquals(valA, valB)) {
+        throw new AssertionError("Mismatch in column '" + colName + "' for 
int8 = " + key +
+                "\nSource: " + valA +
+                "\nSink:   " + valB);
+      }
+    }
+  }
+}
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
new file mode 100644
index 000000000..a6b2191d7
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.apache.kudu.test.ClientTestUtil.countRowsInTable;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduTable;
+
+
+
+public class TestReplication extends ReplicationTestBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplication.class);
+
+  @Test
+  public void testBasicReplication() throws Exception {
+    createAllTypesTable(sourceClient);
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+
+    createAllTypesTable(sinkClient);
+
+    ReplicationEnvProvider executor = new 
ReplicationEnvProvider(createDefaultJobConfig());
+    executor.getEnv().executeAsync();
+
+    KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+    assertEventuallyTrue("Initial 10 rows should be replicated",
+        () -> countRowsInTable(sinkTable) == 10,60000);
+
+    verifySourceAndSinkRowsEqual(10);
+  }
+
+  @Test
+  public void testContinuousReplication() throws Exception {
+    createAllTypesTable(sourceClient);
+    createAllTypesTable(sinkClient);
+
+    ReplicationEnvProvider executor = new 
ReplicationEnvProvider(createDefaultJobConfig());
+    executor.getEnv().executeAsync();
+
+    KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+
+    assertEventuallyTrue("Initial 10 rows should be replicated",
+        () -> countRowsInTable(sinkTable) == 10, 60000);
+
+    insertRowsIntoAllTypesTable(sourceClient, 10, 10);
+
+    assertEventuallyTrue("Additional 10 rows should be replicated, total 20 
rows",
+        () -> countRowsInTable(sinkTable) == 20, 60000);
+
+    verifySourceAndSinkRowsEqual(20);
+  }
+}
diff --git a/java/kudu-replication/src/test/resources/log4j2.properties 
b/java/kudu-replication/src/test/resources/log4j2.properties
new file mode 100644
index 000000000..22762a156
--- /dev/null
+++ b/java/kudu-replication/src/test/resources/log4j2.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+status = error
+name = PropertiesConfig
+appenders = console
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
+
+rootLogger.level = info
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
+
+logger.kudu.name = org.apache.kudu
+logger.kudu.level = debug
diff --git 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index 72b7a2690..3cd85d015 100644
--- 
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++ 
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -228,13 +228,15 @@ public abstract class ClientTestUtil {
     return new Schema(columns);
   }
 
-  public static PartialRow getPartialRowWithAllTypes() {
-    Schema schema = getSchemaWithAllTypes();
-    // Ensure we aren't missing any types
-    assertEquals(15, schema.getColumnCount());
+  public static PartialRow getPartialRowWithAllTypes(PartialRow row, byte 
int8Value) {
+    if (row == null) {
+      Schema schema = getSchemaWithAllTypes();
+      // Ensure we aren't missing any types
+      assertEquals(15, schema.getColumnCount());
+      row = schema.newPartialRow();
+    }
 
-    PartialRow row = schema.newPartialRow();
-    row.addByte("int8", (byte) 42);
+    row.addByte("int8", int8Value);
     row.addShort("int16", (short) 43);
     row.addInt("int32", 44);
     row.addLong("int64", 45);
@@ -250,9 +252,14 @@ public abstract class ClientTestUtil {
     row.addBinary("binary-bytebuffer", binaryBuffer);
     row.setNull("null");
     row.addDecimal("decimal", BigDecimal.valueOf(12345, 3));
+
     return row;
   }
 
+  public static PartialRow getPartialRowWithAllTypes() {
+    return getPartialRowWithAllTypes(null, (byte) 42);
+  }
+
   public static CreateTableOptions getAllTypesCreateTableOptions() {
     return new 
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("int8"));
   }

Reply via email to