This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 4c43fdba5 KUDU-3662 [3/n] Setup source and sink and tests 4c43fdba5 is described below commit 4c43fdba56d453a13a15cd86c8c3ea7596d073dd Author: Zoltan Chovan <zcho...@cloudera.com> AuthorDate: Tue May 27 19:09:53 2025 +0200 KUDU-3662 [3/n] Setup source and sink and tests - Introduce CustomReplicationRowResultConverter to detect delete rows on the source side - Add CustomReplicationOperationMapper to map Flink Row to Kudu upsert/delete operations - Wire source and sink using the above in ReplicationEnvProvider - Set up ReplicationTestBase with separate source and sink Kudu test harnesses to test replication logic - Add basic replication tests verifying row count and content equality Change-Id: I3dd7122933b125b6d4c6dd5dde27a7e8c3f790d4 Reviewed-on: http://gerrit.cloudera.org:8080/23003 Tested-by: Kudu Jenkins Reviewed-by: Marton Greber <greber...@gmail.com> Reviewed-by: Zoltan Martonka <zmarto...@cloudera.com> --- java/config/spotbugs/excludeFilter.xml | 6 - java/gradle/dependencies.gradle | 89 +++++++------ java/kudu-replication/build.gradle | 10 ++ .../CustomReplicationOperationMapper.java | 61 +++++++++ .../CustomReplicationRowRestultConverter.java | 78 ++++++++++++ .../kudu/replication/ReplicationEnvProvider.java | 35 ++++++ .../kudu/replication/ReplicationTestBase.java | 140 +++++++++++++++++++++ .../apache/kudu/replication/TestReplication.java | 72 +++++++++++ .../src/test/resources/log4j2.properties | 32 +++++ .../java/org/apache/kudu/test/ClientTestUtil.java | 19 ++- 10 files changed, 490 insertions(+), 52 deletions(-) diff --git a/java/config/spotbugs/excludeFilter.xml b/java/config/spotbugs/excludeFilter.xml index 90080e9cb..11da3f997 100644 --- a/java/config/spotbugs/excludeFilter.xml +++ b/java/config/spotbugs/excludeFilter.xml @@ -375,10 +375,4 @@ <Class name="org.apache.kudu.test.cluster.FakeDNS"/> <Bug pattern="DP_DO_INSIDE_DO_PRIVILEGED" /> </Match> - - <Match> - <!-- TODO: Remove this in follow-up commit. --> - <Class name="org.apache.kudu.replication.ReplicationEnvProvider" /> - <Bug pattern="URF_UNREAD_FIELD" /> - </Match> </FindBugsFilter> diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index 988ae7ea3..cb6d23f7b 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -25,46 +25,47 @@ ext { } versions += [ - assertj : "3.27.3", - async : "1.4.1", - checkstyle : "8.36.1", - clojure : "1.12.0", - clojureToolsCli: "1.1.230", - commonsIo : "2.15.0", - errorProne : "2.3.3", - errorProneJavac: "9+181-r4173-1", - flink : "1.19.2", - gradle : "7.6.4", - guava : "33.3.1-jre", - hadoop : "3.4.1", - hamcrest : "3.0", - hdrhistogram : "2.2.2", - hive : "3.1.2", - httpClient : "4.5.14", - jacoco : "0.8.6", - jepsen : "0.1.5", - jetty : "9.4.57.v20241219", - jmh : "1.37", - jsr305 : "3.0.2", - junit : "4.13.2", - log4j : "2.24.3", - logCaptor : "2.10.0", - micrometer : "1.8.2", - mockito : "4.2.0", - murmur : "1.0.0", - netty : "4.1.115.Final", - osdetector : "1.6.2", - protobuf : "3.25.5", - ranger : "2.1.0", - scala211 : "2.11.12", - scala : "2.12.20", - scalatest : "3.2.19", - scopt : "4.1.0", - slf4j : "1.7.36", - spark2 : "2.4.8", - spark : "3.2.4", - spotBugs : "4.1.1", - yetus : "0.13.0" + assertj : "3.27.3", + async : "1.4.1", + checkstyle : "8.36.1", + clojure : "1.12.0", + clojureToolsCli : "1.1.230", + commonsIo : "2.15.0", + errorProne : "2.3.3", + errorProneJavac : "9+181-r4173-1", + flinkConnectorKudu : "2.0.0-1.19", + flink : "1.19.2", + gradle : "7.6.4", + guava : "33.3.1-jre", + hadoop : "3.4.1", + hamcrest : "3.0", + hdrhistogram : "2.2.2", + hive : "3.1.2", + httpClient : "4.5.14", + jacoco : "0.8.6", + jepsen : "0.1.5", + jetty : "9.4.57.v20241219", + jmh : "1.37", + jsr305 : "3.0.2", + junit : "4.13.2", + log4j : "2.24.3", + logCaptor : "2.10.0", + micrometer : "1.8.2", + mockito : "4.2.0", + murmur : "1.0.0", + netty : "4.1.115.Final", + osdetector : "1.6.2", + protobuf : "3.25.5", + ranger : "2.1.0", + scala211 : "2.11.12", + scala : "2.12.20", + scalatest : "3.2.19", + scopt : "4.1.0", + slf4j : "1.7.36", + spark2 : "2.4.8", + spark : "3.2.4", + spotBugs : "4.1.1", + yetus : "0.13.0" ] // Log the Gradle version used vs defined. @@ -93,6 +94,14 @@ libs += [ commonsIo : "commons-io:commons-io:$versions.commonsIo", errorProne : "com.google.errorprone:error_prone_core:$versions.errorProne", errorProneJavac : "com.google.errorprone:javac:$versions.errorProneJavac", + flinkClients : "org.apache.flink:flink-clients:$versions.flink", + flinkConnectorBase : "org.apache.flink:flink-connector-base:$versions.flink", + flinkCore : "org.apache.flink:flink-core:$versions.flink", + flinkApiCommon : "org.apache.flink:flink-api-common:$versions.flink", + flinkDist : "org.apache.flink:flink-dist:$versions.flink", + flinkStreamingJava : "org.apache.flink:flink-streaming-java:$versions.flink", + flinkConnectorKudu : "org.apache.flink:flink-connector-kudu:$versions.flinkConnectorKudu", + flinkTestUtils : "org.apache.flink:flink-test-utils:$versions.flink", flinkStreamingJava : "org.apache.flink:flink-streaming-java:$versions.flink", guava : "com.google.guava:guava:$versions.guava", hadoopClient : "org.apache.hadoop:hadoop-client:$versions.hadoop", diff --git a/java/kudu-replication/build.gradle b/java/kudu-replication/build.gradle index 6c1c775b6..d2f5f2e43 100644 --- a/java/kudu-replication/build.gradle +++ b/java/kudu-replication/build.gradle @@ -20,9 +20,19 @@ apply from: "$rootDir/gradle/shadow.gradle" dependencies { // todo(zchovan) confirm if any dependencies should be shaded to avoid classpath issues implementation libs.flinkStreamingJava + implementation libs.flinkConnectorKudu + implementation libs.flinkCore + implementation libs.flinkClients + implementation libs.flinkConnectorBase + implementation project(path: ":kudu-client", configuration: "shadow") testImplementation libs.junit testImplementation libs.assertj + testImplementation libs.junit + testImplementation project(path: ":kudu-test-utils", configuration: "shadow") + testImplementation libs.log4jApi + testImplementation libs.log4jCore + testImplementation libs.log4jSlf4jImpl } // kudu-replication has no public Javadoc. diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java new file mode 100644 index 000000000..b5e44d78d --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationOperationMapper.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.kudu.replication; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.connector.kudu.connector.writer.KuduOperationMapper; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +/** + * A custom implementation of {@link KuduOperationMapper} that maps a Flink {@link Row} + * to the appropriate Kudu {@link Operation} based on the row's kind. + * <p> + * If the row has kind {@link RowKind#DELETE}, a Kudu {@code DeleteIgnore} operation is created. + * Otherwise, a {@code UpsertIgnore} operation is used by default. + * </p> + * <p> + * All fields in the Flink row are written into the Kudu {@link PartialRow}. + * </p> + */ +public class CustomReplicationOperationMapper implements KuduOperationMapper<Row> { + private static final long serialVersionUID = 3364944402413430711L; + + public Operation createBaseOperation(Row input, KuduTable table) { + RowKind kind = input.getKind(); + if (kind == RowKind.DELETE) { + return table.newDeleteIgnore(); + } else { + return table.newUpsertIgnore(); + } + } + + @Override + public List<Operation> createOperations(Row input, KuduTable table) { + Operation operation = createBaseOperation(input, table); + PartialRow partialRow = operation.getRow(); + for (int i = 0; i < input.getArity(); i++) { + partialRow.addObject(i, input.getField(i)); + } + return Collections.singletonList(operation); + } +} diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java new file mode 100644 index 000000000..bced43532 --- /dev/null +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/CustomReplicationRowRestultConverter.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * TODO(mgreber): remove this file once FLINK-37473 is resolved. + * + * This class should live in the official Flink Kudu source connector. + * Until then, it's included here as a temporary workaround in the replication job source. + */ + +package org.apache.kudu.replication; + +import org.apache.flink.connector.kudu.connector.converter.RowResultConverter; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.apache.kudu.Schema; +import org.apache.kudu.client.RowResult; + +/** + * A custom implementation of {@link RowResultConverter} that converts a Kudu {@link RowResult} + * into a Flink {@link Row}. + */ +public class CustomReplicationRowRestultConverter implements RowResultConverter<Row> { + private static final long serialVersionUID = -2668642733689083069L; + + public CustomReplicationRowRestultConverter() { + } + + /** + * If the Kudu schema has a delete marker column (as indicated by {@link Schema#hasIsDeleted()}), + * and the row is marked as deleted, the resulting Flink row's kind is set to + * {@link RowKind#DELETE}. + * + * The special delete marker column (accessible via {@link Schema#getIsDeletedIndex()}) is + * excluded from the Flink row’s fields. All other non-null fields are copied into the Flink + * {@code Row}. + * + * @param row the Kudu {@code RowResult} to convert + * @return a Flink {@code Row} representation, with deletion semantics if applicable + */ + @Override + public Row convert(RowResult row) { + Schema schema = row.getColumnProjection(); + int columnCount = schema.hasIsDeleted() ? schema.getColumnCount() - 1 : schema.getColumnCount(); + Row values = new Row(columnCount); + + int isDeletedIndex = schema.hasIsDeleted() ? schema.getIsDeletedIndex() : -1; + + if (row.hasIsDeleted() && row.isDeleted()) { + values.setKind(RowKind.DELETE); + } + schema.getColumns().forEach(column -> { + int pos = schema.getColumnIndex(column.getName()); + // Skip the isDeleted column if it exists + if (pos == isDeletedIndex && row.hasIsDeleted()) { + return; + } + if (!row.isNull(pos)) { + values.setField(pos, row.getObject(pos)); + } + }); + + return values; + } +} diff --git a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java index 118526bcd..4c874bf05 100644 --- a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java +++ b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java @@ -15,7 +15,19 @@ package org.apache.kudu.replication; +import java.time.Duration; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kudu.connector.KuduTableInfo; +import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connector.kudu.sink.KuduSink; +import org.apache.flink.connector.kudu.sink.KuduSinkBuilder; +import org.apache.flink.connector.kudu.source.KuduSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; /** * Provides a configured {@link StreamExecutionEnvironment} for the replication job. @@ -36,6 +48,29 @@ public class ReplicationEnvProvider { */ public StreamExecutionEnvironment getEnv() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // TODO(mgreber): implement and use reader config parsed from the command line. + KuduSource<Row> kuduSource = KuduSource.<Row>builder() + .setReaderConfig(KuduReaderConfig.Builder + .setMasters(jobConfig.getSourceMasterAddresses()).build()) + .setTableInfo(KuduTableInfo.forTable(jobConfig.getTableName())) + .setRowResultConverter(new CustomReplicationRowRestultConverter()) + .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED) + .setDiscoveryPeriod(Duration.ofSeconds(jobConfig.getDiscoveryIntervalSeconds())) + .build(); + + // TODO(mgreber): implement and use writer config parsed from command line. + KuduSink<Row> kuduSink = new KuduSinkBuilder<Row>() + .setWriterConfig(KuduWriterConfig.Builder + .setMasters(jobConfig.getSinkMasterAddresses()).build()) + .setTableInfo(KuduTableInfo.forTable(jobConfig.getSinkTableName())) + .setOperationMapper(new CustomReplicationOperationMapper()) + .build(); + + env.fromSource(kuduSource, WatermarkStrategy.noWatermarks(), "KuduSource") + .returns(TypeInformation.of(Row.class)) + .sinkTo(kuduSink); + return env; } } diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java new file mode 100644 index 000000000..4bf7ca384 --- /dev/null +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.kudu.replication; + +import static org.apache.kudu.test.ClientTestUtil.getAllTypesCreateTableOptions; +import static org.apache.kudu.test.ClientTestUtil.getPartialRowWithAllTypes; +import static org.apache.kudu.test.ClientTestUtil.getSchemaWithAllTypes; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Rule; + +import org.apache.kudu.Schema; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import org.apache.kudu.test.KuduTestHarness; + +public class ReplicationTestBase { + protected static final String TABLE_NAME = "replication_test_table"; + + @Rule + public final KuduTestHarness sourceHarness = new KuduTestHarness(); + @Rule + public final KuduTestHarness sinkHarness = new KuduTestHarness(); + + protected KuduClient sourceClient; + protected KuduClient sinkClient; + + @Before + public void setupClients() { + this.sourceClient = sourceHarness.getClient(); + this.sinkClient = sinkHarness.getClient(); + } + + + protected ReplicationJobConfig createDefaultJobConfig() { + ReplicationJobConfig jobConfig = ReplicationJobConfig.builder() + .setSourceMasterAddresses(sourceHarness.getMasterAddressesAsString()) + .setSinkMasterAddresses(sinkHarness.getMasterAddressesAsString()) + .setTableName(TABLE_NAME) + .setDiscoveryIntervalSeconds(2) + .build(); + return jobConfig; + } + + protected void createAllTypesTable(KuduClient client) throws Exception { + Schema schema = getSchemaWithAllTypes(); + CreateTableOptions options = getAllTypesCreateTableOptions(); + client.createTable(TABLE_NAME, schema, options); + } + + protected void insertRowsIntoAllTypesTable( + KuduClient client, int startKey, int count) throws Exception { + KuduTable table = client.openTable(TABLE_NAME); + KuduSession session = client.newSession(); + for (int i = 0; i < count; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + getPartialRowWithAllTypes(row, (byte) (startKey + i)); + session.apply(insert); + } + session.flush(); + session.close(); + } + + protected void verifySourceAndSinkRowsEqual(int expectedRowCount) throws Exception { + Map<Byte, RowResult> sourceMap = buildRowMapByInt8Key(sourceClient); + Map<Byte, RowResult> sinkMap = buildRowMapByInt8Key(sinkClient); + + for (int i = 0; i < expectedRowCount; i++) { + byte key = (byte) i; + + RowResult sourceRow = sourceMap.get(key); + RowResult sinkRow = sinkMap.get(key); + + if (sourceRow == null || sinkRow == null) { + throw new AssertionError("Missing row with int8 = " + key + + "\nSource has: " + (sourceRow != null) + + "\nSink has: " + (sinkRow != null)); + } + + compareRowResults(sourceRow, sinkRow, key); + } + } + + private Map<Byte, RowResult> buildRowMapByInt8Key(KuduClient client) throws Exception { + Map<Byte, RowResult> map = new HashMap<>(); + KuduTable table = client.openTable(TABLE_NAME); + KuduScanner scanner = client.newScannerBuilder(table).build(); + + while (scanner.hasMoreRows()) { + RowResultIterator it = scanner.nextRows(); + while (it.hasNext()) { + RowResult row = it.next(); + byte key = row.getByte("int8"); + map.put(key, row); + } + } + + return map; + } + + private void compareRowResults(RowResult a, RowResult b, byte key) { + Schema schema = a.getSchema(); + for (int i = 0; i < schema.getColumnCount(); i++) { + String colName = schema.getColumnByIndex(i).getName(); + + Object valA = a.isNull(i) ? null : a.getObject(i); + Object valB = b.isNull(i) ? null : b.getObject(i); + + if (!java.util.Objects.deepEquals(valA, valB)) { + throw new AssertionError("Mismatch in column '" + colName + "' for int8 = " + key + + "\nSource: " + valA + + "\nSink: " + valB); + } + } + } +} diff --git a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java new file mode 100644 index 000000000..a6b2191d7 --- /dev/null +++ b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplication.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.kudu.replication; + +import static org.apache.kudu.test.ClientTestUtil.countRowsInTable; +import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.client.KuduTable; + + + +public class TestReplication extends ReplicationTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestReplication.class); + + @Test + public void testBasicReplication() throws Exception { + createAllTypesTable(sourceClient); + insertRowsIntoAllTypesTable(sourceClient, 0, 10); + + createAllTypesTable(sinkClient); + + ReplicationEnvProvider executor = new ReplicationEnvProvider(createDefaultJobConfig()); + executor.getEnv().executeAsync(); + + KuduTable sinkTable = sinkClient.openTable(TABLE_NAME); + assertEventuallyTrue("Initial 10 rows should be replicated", + () -> countRowsInTable(sinkTable) == 10,60000); + + verifySourceAndSinkRowsEqual(10); + } + + @Test + public void testContinuousReplication() throws Exception { + createAllTypesTable(sourceClient); + createAllTypesTable(sinkClient); + + ReplicationEnvProvider executor = new ReplicationEnvProvider(createDefaultJobConfig()); + executor.getEnv().executeAsync(); + + KuduTable sinkTable = sinkClient.openTable(TABLE_NAME); + + insertRowsIntoAllTypesTable(sourceClient, 0, 10); + + assertEventuallyTrue("Initial 10 rows should be replicated", + () -> countRowsInTable(sinkTable) == 10, 60000); + + insertRowsIntoAllTypesTable(sourceClient, 10, 10); + + assertEventuallyTrue("Additional 10 rows should be replicated, total 20 rows", + () -> countRowsInTable(sinkTable) == 20, 60000); + + verifySourceAndSinkRowsEqual(20); + } +} diff --git a/java/kudu-replication/src/test/resources/log4j2.properties b/java/kudu-replication/src/test/resources/log4j2.properties new file mode 100644 index 000000000..22762a156 --- /dev/null +++ b/java/kudu-replication/src/test/resources/log4j2.properties @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +status = error +name = PropertiesConfig +appenders = console + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n + +rootLogger.level = info +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = STDOUT + +logger.kudu.name = org.apache.kudu +logger.kudu.level = debug diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java index 72b7a2690..3cd85d015 100644 --- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java +++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java @@ -228,13 +228,15 @@ public abstract class ClientTestUtil { return new Schema(columns); } - public static PartialRow getPartialRowWithAllTypes() { - Schema schema = getSchemaWithAllTypes(); - // Ensure we aren't missing any types - assertEquals(15, schema.getColumnCount()); + public static PartialRow getPartialRowWithAllTypes(PartialRow row, byte int8Value) { + if (row == null) { + Schema schema = getSchemaWithAllTypes(); + // Ensure we aren't missing any types + assertEquals(15, schema.getColumnCount()); + row = schema.newPartialRow(); + } - PartialRow row = schema.newPartialRow(); - row.addByte("int8", (byte) 42); + row.addByte("int8", int8Value); row.addShort("int16", (short) 43); row.addInt("int32", 44); row.addLong("int64", 45); @@ -250,9 +252,14 @@ public abstract class ClientTestUtil { row.addBinary("binary-bytebuffer", binaryBuffer); row.setNull("null"); row.addDecimal("decimal", BigDecimal.valueOf(12345, 3)); + return row; } + public static PartialRow getPartialRowWithAllTypes() { + return getPartialRowWithAllTypes(null, (byte) 42); + } + public static CreateTableOptions getAllTypesCreateTableOptions() { return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("int8")); }