[
https://issues.apache.org/jira/browse/HIVE-26265?focusedWorklogId=784737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-784737
]
ASF GitHub Bot logged work on HIVE-26265:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jun/22 22:47
Start Date: 24/Jun/22 22:47
Worklog Time Spent: 10m
Work Description: cmunkey commented on code in PR #3365:
URL: https://github.com/apache/hive/pull/3365#discussion_r906460588
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.*;
+import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+
+/**
+ * TestTxnReplicationOptimizations - Test transaction replication.
+ */
+public class TestReplicationFilterTransactions {
+ static final private Logger LOG =
LoggerFactory.getLogger(TestReplicationFilterTransactions.class);
+
+ private final static String tid =
+
TestReplicationFilterTransactions.class.getCanonicalName().toLowerCase().replace('.','_')
+ "_" + System.currentTimeMillis();
+ private final static String TEST_PATH =
+ System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR +
tid;
+
+ @Rule
+ public TemporaryFolder tempFolder= new TemporaryFolder();
+
+ static public class PrimaryEventListenerTestImpl extends
MetaStoreEventListener {
+ public PrimaryEventListenerTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+ private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+ private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+ @Override
+ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+ countOpenTxn.getAndIncrement();
+ }
+
+ @Override
+ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onCommitTxn(commitTxnEvent, dbConn, sqlGenerator);
+ countCommitTxn.getAndIncrement();
+ }
+
+ @Override
+ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onAbortTxn(abortTxnEvent, dbConn, sqlGenerator);
+ countAbortTxn.getAndIncrement();
+ }
+
+ public static int getCountOpenTxn() {
+ return countOpenTxn.get();
+ }
+
+ public static int getCountCommitTxn() {
+ return countCommitTxn.get();
+ }
+
+ public static int getCountAbortTxn() {
+ return countAbortTxn.get();
+ }
+
+ public static void reset() {
+ countOpenTxn.set(0);
+ countCommitTxn.set(0);
+ countAbortTxn.set(0);
+ }
+ }
+
+ static public class ReplicaEventListenerTestImpl extends
MetaStoreEventListener {
+ public ReplicaEventListenerTestImpl(Configuration conf) {
+ super(conf);
+ }
+
+ private static final AtomicInteger countOpenTxn = new AtomicInteger(0);
+ private static final AtomicInteger countCommitTxn = new AtomicInteger (0);
+ private static final AtomicInteger countAbortTxn = new AtomicInteger (0);
+
+ private static final Map<Long, Long> txnMapping = new
ConcurrentHashMap<Long, Long>();
+
+ @Override
+ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException {
+ super.onOpenTxn(openTxnEvent, dbConn, sqlGenerator);
+ countOpenTxn.getAndIncrement();
+ // Following code tries to read/save REPL_TXN_MAP, so we can check later
test that target to source TxnId
+ // mapping was done.
+ // But the select below seems cause the following lock error
+ // org.apache.hadoop.hive.ql.lockmgr.LockException: Error
communicating with the metastore
+// try {
Review Comment:
If fixed this by doing a dirty read.
Issue Time Tracking
-------------------
Worklog Id: (was: 784737)
Time Spent: 2h 40m (was: 2.5h)
> REPL DUMP should filter out OpenXacts and unneeded CommitXact/Abort.
> --------------------------------------------------------------------
>
> Key: HIVE-26265
> URL: https://issues.apache.org/jira/browse/HIVE-26265
> Project: Hive
> Issue Type: Improvement
> Components: HiveServer2
> Reporter: francis pang
> Assignee: francis pang
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> REPL DUMP is replication all OpenXacts, even when they are from other non
> replicated databases. This wastes space in the dump, and ends up opening
> unneeded transactions during REPL LOAD.
>
> Add a config property for replication that filters out OpenXact events during
> REPL DUMP. During REPL LOAD, the txns can be implicitly opened when the
> ALLOC_WRITE_ID is processed. For CommitTxn and AbortTxn, dump only if WRITE
> ID was allocated.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)