Copilot commented on code in PR #8278:
URL: https://github.com/apache/hbase/pull/8278#discussion_r3312025637
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java:
##########
@@ -17,44 +17,42 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Category(MediumTests.class)
-public class TestReplicationCompressedWAL extends TestReplicationBase {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);
+@Tag(ReplicationTests.TAG)
+@Tag(MediumTests.TAG)
+public class TestReplicationCompressedWAL extends
TestReplicationBaseNoBeforeAll {
static final Logger LOG =
LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
static final int NUM_BATCHES = 20;
static final int NUM_ROWS_PER_BATCH = 100;
- @BeforeClass
+ @BeforeAll
public static void setUpBeforeClass() throws Exception {
+ configureClusters(UTIL1, UTIL2);
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
- TestReplicationBase.setUpBeforeClass();
+ startClusters();
}
- @AfterClass
+ @AfterAll
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
}
Review Comment:
`@AfterAll` teardown here duplicates the superclass
`TestReplicationBaseNoBeforeAll#tearDownAfterClass` (also annotated with
`@AfterAll`), so JUnit 5 will execute teardown twice. Remove this method and
rely on the base class teardown (or remove `@AfterAll` from one of them) to
avoid double-closing/shutdown issues.
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBaseNoBeforeAll.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * Replication test base class without BeforeAll method, as in some tests we
need to do some changes
+ * before starting clusters.
+ * @see TestReplicationBase
+ */
+public class TestReplicationBaseNoBeforeAll {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationBaseNoBeforeAll.class);
+ protected static Connection connection1;
+ protected static Connection connection2;
+ protected static Configuration CONF_WITH_LOCALFS;
+
+ protected static Admin hbaseAdmin;
+
+ protected static Table htable1;
+ protected static Table htable2;
+
+ protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
+ protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
+ protected static Configuration CONF1 = UTIL1.getConfiguration();
+ protected static Configuration CONF2 = UTIL2.getConfiguration();
+
+ protected static int NUM_SLAVES1 = 1;
+ protected static int NUM_SLAVES2 = 1;
+ protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
+ protected static final long SLEEP_TIME = 500;
+ protected static final int NB_RETRIES = 50;
+ protected static AtomicInteger replicateCount = new AtomicInteger();
+ protected static volatile List<WAL.Entry> replicatedEntries =
Lists.newArrayList();
+
+ protected static final TableName tableName = TableName.valueOf("test");
+ protected static final byte[] famName = Bytes.toBytes("f");
+ protected static final byte[] row = Bytes.toBytes("row");
+ protected static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final String PEER_ID2 = "2";
+
+ protected boolean isSerialPeer() {
+ return false;
+ }
+
+ protected boolean isSyncPeer() {
+ return false;
+ }
+
+ protected final void cleanUp() throws IOException, InterruptedException {
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for (JVMClusterUtil.RegionServerThread r :
UTIL1.getHBaseCluster().getRegionServerThreads()) {
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ }
+ int rowCount = UTIL1.countRows(tableName);
+ UTIL1.deleteTableData(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call deleteTableData on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(rowCount);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void waitForReplication(int expectedRows, int retries)
+ throws IOException, InterruptedException {
+ waitForReplication(htable2, expectedRows, retries);
+ }
+
+ protected static void waitForReplication(Table table, int expectedRows, int
retries)
+ throws IOException, InterruptedException {
+ Scan scan;
+ for (int i = 0; i < retries; i++) {
+ scan = new Scan();
+ if (i == retries - 1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ int count = 0;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ while (scanner.next() != null) {
+ count++;
+ }
+ }
+ if (count != expectedRows) {
+ LOG.info("Only got " + count + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void loadData(String prefix, byte[] row) throws IOException
{
+ loadData(prefix, row, famName);
+ }
+
+ protected static void loadData(String prefix, byte[] row, byte[] familyName)
throws IOException {
+ List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+ put.addColumn(familyName, row, row);
+ puts.add(put);
+ }
+ htable1.put(puts);
+ }
+
+ protected static void setupConfig(HBaseTestingUtil util, String znodeParent)
{
+ Configuration conf = util.getConfiguration();
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
+ // We don't want too many edits per batch sent to the ReplicationEndpoint
to trigger
+ // sufficient number of events. But we don't want to go too low because
+ // HBaseInterClusterReplicationEndpoint partitions entries into batches
and we want
+ // more than one batch sent to the peer cluster for better testing.
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setLong("hbase.serial.replication.waiting.ms", 100);
+ }
+
+ protected static void configureClusters(HBaseTestingUtil util1,
HBaseTestingUtil util2) {
+ setupConfig(util1, "/1");
+ setupConfig(util2, "/2");
+
+ Configuration conf2 = util2.getConfiguration();
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ }
+
+ protected static void restartSourceCluster(int numSlaves) throws Exception {
+ Closeables.close(hbaseAdmin, true);
+ Closeables.close(htable1, true);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL1.restartHBaseCluster(numSlaves);
+ // Invalidate the cached connection state.
+ CONF1 = UTIL1.getConfiguration();
+ hbaseAdmin = UTIL1.getAdmin();
+ Connection connection1 = UTIL1.getConnection();
Review Comment:
This method declares a local `Connection connection1` which shadows the
static field `connection1`, leaving the shared connection unmodified after a
cluster restart. This is confusing and can leave the static `connection1`
pointing at a stale cluster (and never closed here). Assign to the static field
(closing the old one if needed) or consistently use `UTIL1.getConnection()`
everywhere instead of maintaining a separate `connection1` field.
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBaseNoBeforeAll.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * Replication test base class without BeforeAll method, as in some tests we
need to do some changes
+ * before starting clusters.
+ * @see TestReplicationBase
+ */
+public class TestReplicationBaseNoBeforeAll {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationBaseNoBeforeAll.class);
+ protected static Connection connection1;
+ protected static Connection connection2;
+ protected static Configuration CONF_WITH_LOCALFS;
+
+ protected static Admin hbaseAdmin;
+
+ protected static Table htable1;
+ protected static Table htable2;
+
+ protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
+ protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
+ protected static Configuration CONF1 = UTIL1.getConfiguration();
+ protected static Configuration CONF2 = UTIL2.getConfiguration();
+
+ protected static int NUM_SLAVES1 = 1;
+ protected static int NUM_SLAVES2 = 1;
+ protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
+ protected static final long SLEEP_TIME = 500;
+ protected static final int NB_RETRIES = 50;
+ protected static AtomicInteger replicateCount = new AtomicInteger();
+ protected static volatile List<WAL.Entry> replicatedEntries =
Lists.newArrayList();
+
+ protected static final TableName tableName = TableName.valueOf("test");
+ protected static final byte[] famName = Bytes.toBytes("f");
+ protected static final byte[] row = Bytes.toBytes("row");
+ protected static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final String PEER_ID2 = "2";
+
+ protected boolean isSerialPeer() {
+ return false;
+ }
+
+ protected boolean isSyncPeer() {
+ return false;
+ }
+
+ protected final void cleanUp() throws IOException, InterruptedException {
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for (JVMClusterUtil.RegionServerThread r :
UTIL1.getHBaseCluster().getRegionServerThreads()) {
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ }
+ int rowCount = UTIL1.countRows(tableName);
+ UTIL1.deleteTableData(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call deleteTableData on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(rowCount);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void waitForReplication(int expectedRows, int retries)
+ throws IOException, InterruptedException {
+ waitForReplication(htable2, expectedRows, retries);
+ }
+
+ protected static void waitForReplication(Table table, int expectedRows, int
retries)
+ throws IOException, InterruptedException {
+ Scan scan;
+ for (int i = 0; i < retries; i++) {
+ scan = new Scan();
+ if (i == retries - 1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ int count = 0;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ while (scanner.next() != null) {
+ count++;
+ }
+ }
+ if (count != expectedRows) {
+ LOG.info("Only got " + count + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void loadData(String prefix, byte[] row) throws IOException
{
+ loadData(prefix, row, famName);
+ }
+
+ protected static void loadData(String prefix, byte[] row, byte[] familyName)
throws IOException {
+ List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+ put.addColumn(familyName, row, row);
+ puts.add(put);
+ }
+ htable1.put(puts);
+ }
+
+ protected static void setupConfig(HBaseTestingUtil util, String znodeParent)
{
+ Configuration conf = util.getConfiguration();
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
+ // We don't want too many edits per batch sent to the ReplicationEndpoint
to trigger
+ // sufficient number of events. But we don't want to go too low because
+ // HBaseInterClusterReplicationEndpoint partitions entries into batches
and we want
+ // more than one batch sent to the peer cluster for better testing.
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setLong("hbase.serial.replication.waiting.ms", 100);
+ }
+
+ protected static void configureClusters(HBaseTestingUtil util1,
HBaseTestingUtil util2) {
+ setupConfig(util1, "/1");
+ setupConfig(util2, "/2");
+
+ Configuration conf2 = util2.getConfiguration();
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ }
+
+ protected static void restartSourceCluster(int numSlaves) throws Exception {
+ Closeables.close(hbaseAdmin, true);
+ Closeables.close(htable1, true);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL1.restartHBaseCluster(numSlaves);
+ // Invalidate the cached connection state.
+ CONF1 = UTIL1.getConfiguration();
+ hbaseAdmin = UTIL1.getAdmin();
+ Connection connection1 = UTIL1.getConnection();
+ htable1 = connection1.getTable(tableName);
+ }
+
+ static void restartTargetHBaseCluster(int numSlaves) throws Exception {
+ Closeables.close(htable2, true);
+ UTIL2.restartHBaseCluster(numSlaves);
+ // Invalidate the cached connection state
+ CONF2 = UTIL2.getConfiguration();
+ htable2 = UTIL2.getConnection().getTable(tableName);
Review Comment:
Similar to `restartSourceCluster`, this updates `htable2` via
`UTIL2.getConnection()` but never refreshes/updates the static `connection2`
field created in `startClusters`. This can leave `connection2` stale after a
restart and makes connection ownership unclear. Consider closing/replacing the
static `connection2` here, or removing the extra `connection2` field and always
using `UTIL2.getConnection()` consistently.
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBaseNoBeforeAll.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * Replication test base class without BeforeAll method, as in some tests we
need to do some changes
+ * before starting clusters.
+ * @see TestReplicationBase
+ */
+public class TestReplicationBaseNoBeforeAll {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationBaseNoBeforeAll.class);
+ protected static Connection connection1;
+ protected static Connection connection2;
+ protected static Configuration CONF_WITH_LOCALFS;
+
+ protected static Admin hbaseAdmin;
+
+ protected static Table htable1;
+ protected static Table htable2;
+
+ protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
+ protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
+ protected static Configuration CONF1 = UTIL1.getConfiguration();
+ protected static Configuration CONF2 = UTIL2.getConfiguration();
+
+ protected static int NUM_SLAVES1 = 1;
+ protected static int NUM_SLAVES2 = 1;
+ protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final int NB_ROWS_IN_BIG_BATCH = NB_ROWS_IN_BATCH * 10;
+ protected static final long SLEEP_TIME = 500;
+ protected static final int NB_RETRIES = 50;
+ protected static AtomicInteger replicateCount = new AtomicInteger();
+ protected static volatile List<WAL.Entry> replicatedEntries =
Lists.newArrayList();
+
+ protected static final TableName tableName = TableName.valueOf("test");
+ protected static final byte[] famName = Bytes.toBytes("f");
+ protected static final byte[] row = Bytes.toBytes("row");
+ protected static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final String PEER_ID2 = "2";
+
+ protected boolean isSerialPeer() {
+ return false;
+ }
+
+ protected boolean isSyncPeer() {
+ return false;
+ }
+
+ protected final void cleanUp() throws IOException, InterruptedException {
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for (JVMClusterUtil.RegionServerThread r :
UTIL1.getHBaseCluster().getRegionServerThreads()) {
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ }
+ int rowCount = UTIL1.countRows(tableName);
+ UTIL1.deleteTableData(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call deleteTableData on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(rowCount);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void waitForReplication(int expectedRows, int retries)
+ throws IOException, InterruptedException {
+ waitForReplication(htable2, expectedRows, retries);
+ }
+
+ protected static void waitForReplication(Table table, int expectedRows, int
retries)
+ throws IOException, InterruptedException {
+ Scan scan;
+ for (int i = 0; i < retries; i++) {
+ scan = new Scan();
+ if (i == retries - 1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ int count = 0;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ while (scanner.next() != null) {
+ count++;
+ }
+ }
+ if (count != expectedRows) {
+ LOG.info("Only got " + count + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ protected static void loadData(String prefix, byte[] row) throws IOException
{
+ loadData(prefix, row, famName);
+ }
+
+ protected static void loadData(String prefix, byte[] row, byte[] familyName)
throws IOException {
+ List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+ put.addColumn(familyName, row, row);
+ puts.add(put);
+ }
+ htable1.put(puts);
+ }
+
+ protected static void setupConfig(HBaseTestingUtil util, String znodeParent)
{
+ Configuration conf = util.getConfiguration();
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
+ // We don't want too many edits per batch sent to the ReplicationEndpoint
to trigger
+ // sufficient number of events. But we don't want to go too low because
+ // HBaseInterClusterReplicationEndpoint partitions entries into batches
and we want
+ // more than one batch sent to the peer cluster for better testing.
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setLong("hbase.serial.replication.waiting.ms", 100);
+ }
+
+ protected static void configureClusters(HBaseTestingUtil util1,
HBaseTestingUtil util2) {
+ setupConfig(util1, "/1");
+ setupConfig(util2, "/2");
+
+ Configuration conf2 = util2.getConfiguration();
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ }
+
+ protected static void restartSourceCluster(int numSlaves) throws Exception {
+ Closeables.close(hbaseAdmin, true);
+ Closeables.close(htable1, true);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL1.restartHBaseCluster(numSlaves);
+ // Invalidate the cached connection state.
+ CONF1 = UTIL1.getConfiguration();
+ hbaseAdmin = UTIL1.getAdmin();
+ Connection connection1 = UTIL1.getConnection();
+ htable1 = connection1.getTable(tableName);
+ }
+
+ static void restartTargetHBaseCluster(int numSlaves) throws Exception {
+ Closeables.close(htable2, true);
+ UTIL2.restartHBaseCluster(numSlaves);
+ // Invalidate the cached connection state
+ CONF2 = UTIL2.getConfiguration();
+ htable2 = UTIL2.getConnection().getTable(tableName);
+ }
+
+ protected static void createTable(TableName tableName) throws IOException {
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+ UTIL1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
+ UTIL2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
+ UTIL1.waitUntilAllRegionsAssigned(tableName);
+ UTIL2.waitUntilAllRegionsAssigned(tableName);
+ }
+
+ protected static void startClusters() throws Exception {
+ UTIL1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
+ LOG.info("Setup first Zk");
+
+ UTIL2.setZkCluster(miniZK);
+ LOG.info("Setup second Zk");
+
+ CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
+ UTIL1.startMiniCluster(NUM_SLAVES1);
+ // Have a bunch of slave servers, because inter-cluster shipping logic
uses number of sinks
+ // as a component in deciding maximum number of parallel batches to send
to the peer cluster.
+ UTIL2.startMiniCluster(NUM_SLAVES2);
+
+ connection1 = ConnectionFactory.createConnection(CONF1);
+ connection2 = ConnectionFactory.createConnection(CONF2);
+ hbaseAdmin = connection1.getAdmin();
+
+ createTable(tableName);
+ htable1 = connection1.getTable(tableName);
+ htable2 = connection2.getTable(tableName);
+ }
+
+ private boolean peerExist(String peerId, HBaseTestingUtil util) throws
IOException {
+ return util.getAdmin().listReplicationPeers().stream()
+ .anyMatch(p -> peerId.equals(p.getPeerId()));
+ }
+
+ // can be override in tests, in case you need to use zk based uri, or the
old style uri
Review Comment:
Grammar: "can be override" should be "can be overridden".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]