kadirozde commented on code in PR #2192:
URL: https://github.com/apache/phoenix/pull/2192#discussion_r2182839169


##########
phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.DEFAULT_HA_GROUP;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.tool.LogFileAnalyzer;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogGroupIT extends ParallelStatsDisabledIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
+
+    @Rule
+    public TestName name = new TestName();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.SYNCHRONOUS_REPLICATION_ENABLED, 
Boolean.TRUE.toString());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void beforeTest() throws Exception {
+        LOG.info("Starting test {}", name.getMethodName());
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        LOG.info("Starting cleanup for test {}", name.getMethodName());
+        cleanupLogsFolder(standbyUri);
+        LOG.info("Ending cleanup for test {}", name.getMethodName());
+    }
+
+    /**
+     * Delete all the shards under the top level replication log directory
+     * @throws IOException
+     */
+    private void cleanupLogsFolder(URI source) throws IOException {
+        FileSystem fs = FileSystem.get(config);
+        Path dir = new Path(source.getPath());
+        FileStatus[] statuses = fs.listStatus(dir);
+        for (FileStatus status : statuses) {
+            Path shard = status.getPath();
+            if (status.isDirectory()) {
+                fs.delete(shard, true);
+            }
+        }
+    }
+
+    private ReplicationLogGroup getReplicationLogGroup() throws IOException {
+        HRegionServer rs = getUtility().getHBaseCluster().getRegionServer(0);
+        return ReplicationLogGroup.get(config, rs.getServerName(), 
DEFAULT_HA_GROUP);
+    }
+
+    private Map<String, List<Mutation>> groupLogsByTable() throws Exception {
+        ReplicationLogGroup log = getReplicationLogGroup();
+        log.getActiveWriter().closeCurrentWriter();
+        LogFileAnalyzer analyzer = new LogFileAnalyzer();
+        analyzer.setConf(config);
+        String[] args = {"--check", standbyUri.getPath()};
+        assertEquals(0, analyzer.run(args));
+        return analyzer.groupLogsByTable(standbyUri.getPath());
+    }
+
+    private int getCountForTable(Map<String, List<Mutation>> logsByTable,
+                                 String tableName) throws Exception {
+        List<Mutation> mutations = logsByTable.get(tableName);
+        return mutations != null ? mutations.size() : 0;
+    }
+
+    private void verifyReplication(Connection conn,
+                                   Map<String, Integer> expected) throws 
Exception {
+        Map<String, List<Mutation>> mutationsByTable = groupLogsByTable();
+        dumpTableLogCount(mutationsByTable);
+        for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+            String tableName = entry.getKey();
+            int expectedMutationCount = entry.getValue();
+            List<Mutation> mutations = mutationsByTable.get(tableName);
+            int actualMutationCount = mutations != null ? mutations.size() : 0;
+            try {
+                if (!tableName.equals(SYSTEM_CATALOG_NAME)) {
+                    assertEquals(String.format("For table %s", tableName),
+                            expectedMutationCount, actualMutationCount);
+                } else {
+                    // special handling for syscat
+                    assertTrue("For SYSCAT", actualMutationCount >= 
expectedMutationCount);
+                }
+            } catch (AssertionError e) {
+                TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+                throw e;
+            }
+        }
+    }
+
+    private void dumpTableLogCount(Map<String, List<Mutation>> 
mutationsByTable) {
+        LOG.info("Dump table log count for test {}", name.getMethodName());
+        for  (Map.Entry<String, List<Mutation>> table : 
mutationsByTable.entrySet()) {
+            LOG.info("#Log entries for {} = {}", table.getKey(), 
table.getValue().size());
+        }
+    }
+
+    private void moveRegionToServer(TableName tableName, ServerName sn) throws 
Exception {
+        HBaseTestingUtility util = getUtility();
+        try (RegionLocator locator = 
util.getConnection().getRegionLocator(tableName)) {
+            String regEN = 
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+            while 
(!sn.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+                LOG.info("Moving region {} of table {} to server {}", regEN, 
tableName, sn);
+                util.getAdmin().move(Bytes.toBytes(regEN), sn);
+                Thread.sleep(100);
+            }
+            LOG.info("Moved region {} of table {} to server {}", regEN, 
tableName, sn);
+        }
+    }
+
+    private PhoenixTestBuilder.SchemaBuilder createViewHierarchy() throws 
Exception {
+        // Define the test schema.
+        // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => 
(ORG_ID, KP)
+        // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new 
PhoenixTestBuilder.SchemaBuilder(getUrl());
+        PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions =
+                PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
+                globalViewOptions = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewWithOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions
+                tenantViewIndexOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            schemaBuilder.withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withTenantViewOptions(tenantViewWithOverrideOptions)
+                    .withTenantViewIndexOptions(tenantViewIndexOverrideOptions)
+                    .buildWithNewTenant();
+        }
+        return schemaBuilder;
+    }
+
+    @Test
+    public void testAppendAndSync() throws Exception {
+        final String tableName = "T_" + generateUniqueName();
+        final String indexName1 = "I_" + generateUniqueName();
+        final String indexName2 = "I_" + generateUniqueName();
+        final String indexName3 = "L_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = String.format("create table %s (id1 integer not null, 
" +
+                    "id2 integer not null, val1 varchar, val2 varchar " +
+                    "constraint pk primary key (id1, id2))", tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val1) include (val2)",
+                    indexName1, tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val2) include (val1)",
+                    indexName2, tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create local index %s on %s (id2,val1) 
include (val2)",
+                    indexName3, tableName);
+            conn.createStatement().execute(ddl);
+            conn.commit();
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tableName + " VALUES(?, ?, ?, ?)");
+            // upsert 50 rows
+            int rowCount = 50;
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 10; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+                    stmt.setString(4, null);
+                    stmt.executeUpdate();
+                }
+                conn.commit();
+            }
+            // do some atomic upserts which will be ignored and therefore not 
replicated
+            stmt = conn.prepareStatement("upsert into " + tableName + " 
VALUES(?, ?, ?) " +
+                    "ON DUPLICATE KEY IGNORE");
+            conn.setAutoCommit(true);
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 2; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, null);
+                    assertEquals(0, stmt.executeUpdate());
+                }
+            }
+            // verify the correctness of the index
+            IndexToolIT.verifyIndexTable(tableName, indexName1, conn);
+            // verify replication
+            Map<String, Integer> expected = Maps.newHashMap();
+            // mutation count will be equal to row count since the atomic 
upsert mutations will be
+            // ignored and therefore not replicated
+            expected.put(tableName, rowCount * 3); // Put + Delete + local 
index update
+            // for index1 unverified + verified + delete (Delete column)
+            expected.put(indexName1, rowCount * 3);
+            // for index2 unverified + verified  since the null column is part 
of row key
+            expected.put(indexName2, rowCount * 2);
+            // we didn't create any tenant views so no change in the syscat 
entries
+            expected.put(SYSTEM_CATALOG_NAME, 0);
+            expected.put(SYSTEM_CHILD_LINK_NAME, 0);
+            verifyReplication(conn, expected);
+        }
+    }
+
+    @Test
+    public void testPreWALRestoreSkip() throws Exception {
+        HBaseTestingUtility util = getUtility();
+        MiniHBaseCluster cluster = util.getHBaseCluster();
+        final String tableName = "T_" + generateUniqueName();
+        final String indexName = "I_" + generateUniqueName();
+        TableName table = TableName.valueOf(tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = String.format("create table %s (id1 integer not null, 
" +
+                    "id2 integer not null, val1 varchar, val2 varchar " +
+                    "constraint pk primary key (id1, id2))", tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val1) include (val2)",
+                    indexName, tableName);
+            conn.createStatement().execute(ddl);
+            conn.commit();
+        }
+        JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer();
+        ServerName sn2 = rs2.getRegionServer().getServerName();
+        moveRegionToServer(table, sn2);
+        moveRegionToServer(TableName.valueOf(SYSTEM_CATALOG_NAME), sn2);
+        moveRegionToServer(TableName.valueOf(SYSTEM_CHILD_LINK_NAME), sn2);
+        int rowCount = 50;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tableName + " VALUES(?, ?, ?, ?)");
+            // upsert 50 rows
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 10; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+                    stmt.setString(4, null); // Generate a DeleteColumn cell
+                    stmt.executeUpdate();
+                }
+                // we want to simulate RS crash after updating memstore and WAL
+                IndexRegionObserver.setIgnoreSyncReplicationForTesting(true);
+                conn.commit();
+            }
+            // Create tenant views for syscat and child link replication
+            createViewHierarchy();
+        } finally {
+            IndexRegionObserver.setIgnoreSyncReplicationForTesting(false);
+        }
+        cluster.killRegionServer(rs2.getRegionServer().getServerName());
+        Threads.sleep(20000); // just to be sure that the kill has fully 
started.
+        util.waitUntilAllRegionsAssigned(table);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Map<String, Integer> expected = Maps.newHashMap();
+            // For each row 1 Put + 1 Delete (DeleteColumn)
+            expected.put(tableName, rowCount * 2);
+            // unverified + verified + delete (Delete column)
+            expected.put(indexName, rowCount * 3);
+            // 1 tenant view was created
+            expected.put(SYSTEM_CHILD_LINK_NAME, 1);
+            // atleast 1 log entry for syscat
+            expected.put(SYSTEM_CATALOG_NAME, 1);
+            verifyReplication(conn, expected);
+        }
+    }
+
+    @Test
+    public void testSystemTables() throws Exception {
+        createViewHierarchy();
+        Map<String, List<Mutation>> logsByTable = groupLogsByTable();
+        dumpTableLogCount(logsByTable);
+        Map<String, List<Mutation>> systemTables = 
logsByTable.entrySet().stream()
+                .filter(entry -> 
entry.getKey().startsWith(QueryConstants.SYSTEM_SCHEMA_NAME))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        assertEquals(2, systemTables.size());
+        assertEquals(1, getCountForTable(systemTables, 
SYSTEM_CHILD_LINK_NAME));
+        assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);

Review Comment:
   Should this be an equality check, = 1 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to