[
https://issues.apache.org/jira/browse/HIVE-26265?focusedWorklogId=784279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-784279
]
ASF GitHub Bot logged work on HIVE-26265:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Jun/22 15:41
Start Date: 23/Jun/22 15:41
Worklog Time Spent: 10m
Work Description: pvary commented on code in PR #3365:
URL: https://github.com/apache/hive/pull/3365#discussion_r905185430
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.*;
+import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+
+/**
+ * TestTxnReplicationOptimizations - Test transaction replication.
+ */
+public class TestReplicationFilterTransactions {
+ static final private Logger LOG =
LoggerFactory.getLogger(TestReplicationFilterTransactions.class);
+
+ private final static String tid =
+
TestReplicationFilterTransactions.class.getCanonicalName().toLowerCase().replace('.','_')
+ "_" + System.currentTimeMillis();
+ private final static String TEST_PATH =
+ System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR +
tid;
+
+ @Rule
+ public TemporaryFolder tempFolder= new TemporaryFolder();
+
+ static public class PrimaryEventListenerTestImpl extends
MetaStoreEventListener {
+ public PrimaryEventListenerTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+ private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+ private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+ @Override
+ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+ countOpenTxn.getAndIncrement();
+ }
+
+ @Override
+ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+ countCommitTxn.getAndIncrement();
+ }
+
+ @Override
+ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+ countAbortTxn.getAndIncrement();
+ }
+
+ public static int getCountOpenTxn() {
+ return countOpenTxn.get();
+ }
+
+ public static int getCountCommitTxn() {
+ return countCommitTxn.get();
+ }
+
+ public static int getCountAbortTxn() {
+ return countAbortTxn.get();
+ }
+
+ public static void reset() {
+ countOpenTxn.set(0);
+ countCommitTxn.set(0);
+ countAbortTxn.set(0);
+ }
+ }
+
+ static public class ReplicaEventListenerTestImpl extends
MetaStoreEventListener {
+ public ReplicaEventListenerTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+ private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+ private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+ private static final Map<Long, Long> txnMapping = new
ConcurrentHashMap<Long, Long>();
+
+ @Override
+ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+ countOpenTxn.getAndIncrement();
+ // Following code tries to read/save REPL_TXN_MAP, so we can check later
test that target to source TxnId
+ // mapping was done.
+ // But the select below seems cause the following lock error
+ // org.apache.hadoop.hive.ql.lockmgr.LockException: Error
communicating with the metastore
+// try {
+// TestReplicationFilterTransactions.updateTxnMapping(txnMapping);
+// } catch (Exception e) {
+// throw new RuntimeException(e);
+// }
+ }
+
+ @Override
+ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+ countCommitTxn.getAndIncrement();
+ }
+
+ @Override
+ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+ countAbortTxn.getAndIncrement();
+ }
+
+ public static int getCountOpenTxn() {
+ return countOpenTxn.get();
+ }
+
+ public static int getCountCommitTxn() {
+ return countCommitTxn.get();
+ }
+
+ public static int getCountAbortTxn() {
+ return countAbortTxn.get();
+ }
+
+ public static Map<Long, Long> getTxnMapping() { return new
HashMap(txnMapping); }
+
+ public static void reset() {
+ countOpenTxn.set(0);
+ countCommitTxn.set(0);
+ countAbortTxn.set(0);
+ txnMapping.clear();
+ }
+ }
+
+ static class EventCount {
+ int countOpenTxn;
+ int countCommitTxn;
+ int countAbortTxn;
+
+ public EventCount(int countOpenTxn, int countCommitTxn, int countAbortTxn)
{
+ this.countOpenTxn = countOpenTxn;
+ this.countCommitTxn = countCommitTxn;
+ this.countAbortTxn = countAbortTxn;
+ }
+
+ public int getCountOpenTxn() {
+ return countOpenTxn;
+ }
+
+ public int getCountCommitTxn() {
+ return countCommitTxn;
+ }
+
+ public int getCountAbortTxn() {
+ return countAbortTxn;
+ }
+ }
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ static WarehouseInstance primary;
+ static WarehouseInstance replica;
+
+ static HiveConf dfsConf;
+
+ String primaryDbName, replicatedDbName, otherDbName;
+
+ EventCount expected;
+
+
+ private Map<String, String> setupConf(String dfsUri, String
listenerClassName) {
+ Map<String, String> confMap = new HashMap<String, String>();
+ confMap.put("fs.defaultFS", dfsUri);
+ confMap.put("hive.support.concurrency", "true");
+ confMap.put("hive.txn.manager",
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ confMap.put("hive.metastore.client.capability.check", "false");
+ confMap.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ confMap.put("hive.strict.checks.bucketing", "false");
+ confMap.put("hive.mapred.mode", "nonstrict");
+ confMap.put("mapred.input.dir.recursive", "true");
+ confMap.put("hive.metastore.disallow.incompatible.col.type.changes",
"false");
+ confMap.put("hive.stats.autogather", "false");
+ confMap.put("hive.in.repl.test", "true");
+ confMap.put(MetastoreConf.ConfVars.EVENT_LISTENERS.getVarname(),
listenerClassName);
+ confMap.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ return confMap;
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ TestReplicationFilterTransactions.dfsConf = new
HiveConf(TestReplicationFilterTransactions.class);
+
TestReplicationFilterTransactions.dfsConf.set("dfs.client.use.datanode.hostname",
"true");
+ TestReplicationFilterTransactions.dfsConf.set("hadoop.proxyuser." +
Utils.getUGI().getShortUserName() + ".hosts", "*");
+ TestReplicationFilterTransactions.dfsConf.set("dfs.namenode.acls.enabled",
"true");
+
+ MiniDFSCluster miniDFSCluster =
+ new
MiniDFSCluster.Builder(TestReplicationFilterTransactions.dfsConf).numDataNodes(2).format(true).build();
+
+ Map<String, String> conf =
setupConf(miniDFSCluster.getFileSystem().getUri().toString(),
+ PrimaryEventListenerTestImpl.class.getName());
+ primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+
+ conf = setupConf(miniDFSCluster.getFileSystem().getUri().toString(),
+ ReplicaEventListenerTestImpl.class.getName());
+ conf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+
+ replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
+
+ primaryDbName = testName.getMethodName() + "_" +
System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ otherDbName = testName.getMethodName() + "_other_" +
System.currentTimeMillis();
+ primary.run("create database " + otherDbName);
+
+ PrimaryEventListenerTestImpl.reset();
+ ReplicaEventListenerTestImpl.reset();
+ expected = new EventCount(8, 6, 2);
+ }
+
+ static void updateTxnMapping(Map<Long, Long> map) throws Exception {
+ String result = TestTxnDbUtil.queryToString(replica.hiveConf,
+ "SELECT \"RTM_TARGET_TXN_ID\", \"RTM_SRC_TXN_ID\" FROM
\"REPL_TXN_MAP\"", false);
+ Assert.assertNotNull(result);
+
+ if (result.isEmpty()) {
+ return;
+ }
+
+ String[] rows = result.split("\n");
+
+ for (int i = 0; i < rows.length; i++) {
+ String[] cols = rows[i].split(" ");
+ Long srcTxnId = Long.valueOf(cols[1]);
+ Long tgtTxnId = Long.valueOf(cols[0]);
+ map.put(srcTxnId, tgtTxnId);
+ }
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + primaryDbName + " cascade");
+ primary.run("drop database if exists " + otherDbName + " cascade");
+
+ replica.run("drop database if exists " + replicatedDbName + " cascade");
+ primary.close();
+ replica.close();
+ }
+
+ private void prepareBootstrapData() throws Throwable {
+ // primaryDbName is replicated, t2 and t2 are ACID tables with initial
data.
+ // t3 is an ACID table with 2 initial rows, later t3 will be locked to
force aborted transaction.
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)")
+ .run("create table t2 (place string) partitioned by (country
string) clustered by(place) " +
+ "into 3 buckets stored as orc tblproperties
(\"transactional\"=\"true\")")
+ .run("insert into t2 partition(country='india') values
('bangalore')")
+ .run("create table t3 (id int) stored as orc tblproperties
(\"transactional\"=\"true\")")
+ .run("insert into t3 values(111), (222)");
+
+ // otherDbName is not replicated, but contains ACID tables.
+ primary.run("use " + otherDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(100)")
+ .run("create table t2 (place string) partitioned by (country
string) clustered by(place) " +
+ "into 3 buckets stored as orc tblproperties
(\"transactional\"=\"true\")")
+ .run("insert into t2 partition(country='usa') values ('san
francisco')")
+ .run("create table t3 (id int) stored as orc tblproperties
(\"transactional\"=\"true\")")
+ .run("insert into t3 values(1110), (2220)");
+ }
+
+ private void alterBucketFile(Path warehouseRoot, String dbName, String
tableName, boolean toCorrupted) throws IOException {
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+
+ String bucket = "bucket_00000_0";
+
+ //Path location = new Path(new Path(warehouseRoot, dbName.toLowerCase() +
".db"), tableName);
+ PathBuilder pb = new PathBuilder(warehouseRoot.toString())
+ .addDescendant(dbName.toLowerCase() + ".db")
+ .addDescendant(tableName)
+ .addDescendant("delta_0000001_0000001_0000")
+ .addDescendant(bucket);
+ Path location = pb.build();
+ File junkFile = new File(tempFolder.getRoot(), "junk");
+ File saveFolder = new File(tempFolder.getRoot(), dbName + "_" + tableName);
+
+ if (toCorrupted) {
+ if (!junkFile.exists()) {
+ File junk = tempFolder.newFile("junk");
+ FileUtils.writeStringToFile(junk, "junk", StandardCharsets.UTF_8);
+ }
+ Path dest = new Path(saveFolder.getAbsolutePath(), bucket);
+ fs.copyToLocalFile(true, location, dest);
+ fs.copyFromLocalFile(false, true, new Path(junkFile.getAbsolutePath()),
location);
+ } else {
+ Path src = new Path(saveFolder.getAbsolutePath(), bucket);
+ fs.copyFromLocalFile(true, true, src, location);
+ }
+ }
+
+ private void prepareAbortTxn(String dbName, int value) throws Throwable {
+ // Forces an abortTxn even to be generated in the database.
+ // The abortTxn needs to be generated during the execution phase of the
plan,
+ // to do so, the bucket file of the table is intentionally mangled to
+ // induce an error and abortTxn during the execution phase.
+
+ alterBucketFile(primary.warehouseRoot, dbName, "t3", true);
+ try {
+ primary.run("use " + dbName)
+ .run("update t3 set id = 999 where id = " +
String.valueOf(value));
+ Assert.fail("Update should have failed");
+ } catch (Throwable t) {
+ Assert.assertTrue(t.getCause().getCause() instanceof
org.apache.orc.FileFormatException);
+ }
+ alterBucketFile(primary.warehouseRoot, dbName, "t3", false);
+ }
+
+ private void prepareIncrementalData() throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("insert into t1 values (2), (3)")
+ .run("insert into t2 partition(country='india') values
('chennai')")
+ .run("insert into t2 partition(country='india') values ('pune')");
+ prepareAbortTxn(primaryDbName, 222);
+ primary.run("use " + otherDbName)
+ .run("insert into t1 values (200), (300)")
+ .run("insert into t2 partition(country='usa') values ('santa
clara')")
+ .run("insert into t2 partition(country='usa') values ('palo
alto')");
+ prepareAbortTxn(otherDbName, 2220);
+ }
+
+ private void burnTransactionsOnReplica() throws Throwable {
+ // On the replica, burn some transaction ids so that load needs to do
mappin.
+ replica.run("use " + replicatedDbName)
+ .run("create table t999 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t999 values (99901)")
+ .run("insert into t999 values (99902)")
+ .run("insert into t999 values (99903)")
+ .run("insert into t999 values (99904)")
+ .run("insert into t999 values (99905)")
+ .run("insert into t999 values (99906)")
+ .run("insert into t999 values (99907)")
+ .run("insert into t999 values (99908)")
+ .run("insert into t999 values (99909)")
+ .run("insert into t999 values (99910)");
+ }
+
+ private List<String> withTxnOptimized(boolean optimizationOn) {
+ return Collections.singletonList(String.format("'%s'='%s'",
HiveConf.ConfVars.REPL_FILTER_TRANSACTIONS.toString(),
+ String.valueOf(optimizationOn)));
+ }
+
+ @Test
+ public void testTxnEventsUnoptimized() throws Throwable {
+ prepareBootstrapData();
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, withTxnOptimized(false));
+ replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+ assertBootstrap(tuple);
+
+ // For some reason, burning transactions below causes unexpected
open_txn() callback counts.
+ // I am not sure where they come from, or why it does not affects the
testTxnEventsOptimized() test.
+// burnTransactionsOnReplica();
+ PrimaryEventListenerTestImpl.reset();
+ ReplicaEventListenerTestImpl.reset();
+
+ prepareIncrementalData();
+ tuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, withTxnOptimized(false));
+ replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+
+ EventCount filtered = new EventCount(0, 0, 0);
+ assertTxnOptimization(false, tuple, filtered);
+ }
+
+ @Test
+ public void testTxnEventsOptimized() throws Throwable {
+ prepareBootstrapData();
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, withTxnOptimized(false));
+ replica.load(replicatedDbName, primaryDbName, withTxnOptimized(false));
+ assertBootstrap(tuple);
+ burnTransactionsOnReplica();
+ PrimaryEventListenerTestImpl.reset();
+ ReplicaEventListenerTestImpl.reset();
+
+ prepareIncrementalData();
+ tuple = primary.run("use " + primaryDbName)
+ .dump(primaryDbName, withTxnOptimized(true));
+ replica.load(replicatedDbName, primaryDbName, withTxnOptimized(true));
+
+ EventCount filtered = new EventCount(4, 3, 1);
+ assertTxnOptimization(true, tuple, filtered);
+ }
+
+ private void assertBootstrap(WarehouseInstance.Tuple tuple) throws
IOException {
+ List<Path> openTxns = new ArrayList<Path>();
+ List<Path> commitTxns = new ArrayList<Path>();
+ List<Path> abortTxns = new ArrayList<Path>();
+
+ ReplicationTestUtils.findTxnsFromDump(tuple, primary.hiveConf, openTxns,
commitTxns, abortTxns);
+
+ Assert.assertEquals(openTxns.size(), 0);
+ Assert.assertEquals(commitTxns.size(), 0);
+ Assert.assertEquals(abortTxns.size(), 0);
+ }
+
+ private void assertTxnOptimization(boolean optimizationOn,
WarehouseInstance.Tuple tuple, EventCount filtered) throws Exception {
+
+ List<Path> openTxns = new ArrayList<Path>();
+ List<Path> commitTxns = new ArrayList<Path>();
+ List<Path> abortTxns = new ArrayList<Path>();
+
+ ReplicationTestUtils.findTxnsFromDump(tuple, primary.hiveConf, openTxns,
commitTxns, abortTxns);
+
+ Assert.assertEquals(expected.getCountOpenTxn(),
PrimaryEventListenerTestImpl.getCountOpenTxn());
+ Assert.assertEquals(expected.getCountCommitTxn(),
PrimaryEventListenerTestImpl.getCountCommitTxn());
+ Assert.assertEquals(expected.getCountAbortTxn(),
PrimaryEventListenerTestImpl.getCountAbortTxn());
+
+ Assert.assertEquals(expected.getCountOpenTxn() -
filtered.getCountOpenTxn(), ReplicaEventListenerTestImpl.getCountOpenTxn());
+ Assert.assertEquals(expected.getCountCommitTxn() -
filtered.getCountCommitTxn(), ReplicaEventListenerTestImpl.getCountCommitTxn());
+ Assert.assertEquals(expected.getCountAbortTxn() -
filtered.getCountAbortTxn(), ReplicaEventListenerTestImpl.getCountAbortTxn());
+
+ Assert.assertEquals(optimizationOn ? 0 : expected.getCountOpenTxn(),
openTxns.size());
+ Assert.assertEquals(expected.getCountCommitTxn() -
filtered.getCountCommitTxn(), commitTxns.size());
+ Assert.assertEquals(expected.getCountAbortTxn() -
filtered.getCountAbortTxn(), abortTxns.size());
+
+// Map<Long, Long> replicaTxnMapping =
ReplicaEventListenerTestImpl.getTxnMapping();
Review Comment:
And this
Issue Time Tracking
-------------------
Worklog Id: (was: 784279)
Time Spent: 2h 10m (was: 2h)
> REPL DUMP should filter out OpenXacts and unneeded CommitXact/Abort.
> --------------------------------------------------------------------
>
> Key: HIVE-26265
> URL: https://issues.apache.org/jira/browse/HIVE-26265
> Project: Hive
> Issue Type: Improvement
> Components: HiveServer2
> Reporter: francis pang
> Assignee: francis pang
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> REPL DUMP is replication all OpenXacts, even when they are from other non
> replicated databases. This wastes space in the dump, and ends up opening
> unneeded transactions during REPL LOAD.
>
> Add a config property for replication that filters out OpenXact events during
> REPL DUMP. During REPL LOAD, the txns can be implicitly opened when the
> ALLOC_WRITE_ID is processed. For CommitTxn and AbortTxn, dump only if WRITE
> ID was allocated.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)