This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d0372808177 HIVE-28772: Clear REPL_TXN_MAP table on DR when deleting
replication policy (#5656) (Harshal Patel, reviewed by Teddy Choi)
d0372808177 is described below
commit d0372808177a823d63383e311c5909aa46b9a961
Author: Harshal Patel <[email protected]>
AuthorDate: Fri Mar 7 12:40:07 2025 +0530
HIVE-28772: Clear REPL_TXN_MAP table on DR when deleting replication policy
(#5656) (Harshal Patel, reviewed by Teddy Choi)
Details:
- Problem :
* Suppose user is doing incremental replication. As per current design it
can happen that a transaction can span across replication cycles.
* So, if open transaction gets replayed to DR side and a user deletes the
databases and policy on both Src and DR side then there will be dangling entry
in REPL_TXN_MAP table on the DR side.
* If a user creates the database name with the same name as previous then
after 11 days housekeeper thread deletes the REPL_TXN_MAP dangling entry which
basically sets the database incompatible in the newly created database from new
replication policy.
- Solution :
* while deleting REPL LOAD scheduled query, it will check for Repl Created
Transactions on DR side and if there is any then it will abort it and put the
flag repl.Incompatible on DR side which indicates the transaction was aborted
so bootstrap will be required for future replication.
* By any means if user try to resume the replication then DR will be in
inconsistent state. * if there are no Repl created open transactions then it
will be the same operation as current
---
.../apache/hadoop/hive/metastore/HMSHandler.java | 44 ++++++++++++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 20 +++++++
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 5 ++
.../GetTargetTxnIdListForPolicyHandler.java | 64 ++++++++++++++++++++
.../client/TestMetastoreScheduledQueries.java | 69 ++++++++++++++++++++++
5 files changed, 202 insertions(+)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index ec50975f879..0b1b7c27ed6 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -88,6 +88,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -10925,8 +10926,13 @@ public void
scheduled_query_maintenance(ScheduledQueryMaintenanceRequest request
startFunction("scheduled_query_poll");
Exception ex = null;
try {
+ String query = request.getScheduledQuery().getQuery();
+ ScheduledQueryMaintenanceRequestType requestType = request.getType();
RawStore ms = getMS();
ms.scheduledQueryMaintenance(request);
+ if (requestType == ScheduledQueryMaintenanceRequestType.DROP) {
+ abortReplCreatedOpenTxnsForDatabase(query);
+ }
} catch (Exception e) {
LOG.error("Caught exception", e);
ex = e;
@@ -10936,6 +10942,44 @@ public void
scheduled_query_maintenance(ScheduledQueryMaintenanceRequest request
}
}
+ private void abortReplCreatedOpenTxnsForDatabase(String query) throws
TException {
+ List<Long> toBeAbortedTxns = null;
+ List<TxnType> txnListExcludingReplCreated = new ArrayList<>();
+ String pattern = "(?<=REPL LOAD )\\w+(?= INTO \\w+)";
+ Pattern regex = Pattern.compile(pattern);
+ Matcher matcher = regex.matcher(query);
+ String dbName;
+ if (matcher.find()) {
+ dbName = matcher.group();
+ String replPolicy = dbName + ".*";
+ for (TxnType type : TxnType.values()) {
+ // exclude REPL_CREATED txn
+ if (type != TxnType.REPL_CREATED) {
+ txnListExcludingReplCreated.add(type);
+ }
+ }
+ List<Long> openTxnList = null;
+ GetOpenTxnsResponse openTxnsResponse = null;
+ try {
+ openTxnsResponse = getTxnHandler()
+ .getOpenTxns(txnListExcludingReplCreated);
+ } catch (Exception e) {
+ LOG.error("Got an error : " + e);
+ }
+ if (openTxnsResponse != null) {
+ openTxnList = openTxnsResponse.getOpen_txns();
+ if (openTxnList != null) {
+ toBeAbortedTxns = getTxnHandler()
+ .getOpenTxnForPolicy(openTxnList, replPolicy);
+ if (!toBeAbortedTxns.isEmpty()) {
+ LOG.info("Aborting Repl created open transactions");
+ abort_txns(new AbortTxnsRequest(toBeAbortedTxns));
+ }
+ }
+ }
+ }
+ }
+
@Override
public void scheduled_query_progress(ScheduledQueryProgressInfo info) throws
MetaException, TException {
startFunction("scheduled_query_poll");
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6e002da6ad9..547c3a04a61 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -374,6 +374,26 @@ public GetOpenTxnsResponse getOpenTxns(List<TxnType>
excludeTxnTypes) throws Met
.toOpenTxnsResponse(excludeTxnTypes);
}
+ @Override
+ public List<Long> getOpenTxnForPolicy(List<Long> openTxnList, String
replPolicy) {
+
+ if (openTxnList.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<Long> targetTxnIds = null;
+ try {
+ targetTxnIds = jdbcResource.execute(new
GetTargetTxnIdListForPolicyHandler(replPolicy, openTxnList));
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (targetTxnIds.isEmpty()) {
+ LOG.info("There are no Repl Created open transactions on DR side.");
+ }
+ return targetTxnIds;
+ }
+
+
/**
* Retry-by-caller note:
* Worst case, it will leave an open txn which will timeout.
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index f31308ba397..2cce4966cd1 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -186,6 +186,11 @@ enum MUTEX_KEY {
@RetrySemantics.ReadOnly
GetOpenTxnsResponse getOpenTxns(List<TxnType> excludeTxnTypes) throws
MetaException;
+ @SqlRetry
+ @Transactional(POOL_TX)
+ @RetrySemantics.ReadOnly
+ List<Long> getOpenTxnForPolicy(List<Long> openTxnList, String replPolicy);
+
/**
* Get the count for open transactions.
* @throws MetaException
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
new file mode 100644
index 00000000000..6ddf6f5e1ce
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
+
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GetTargetTxnIdListForPolicyHandler implements
QueryHandler<List<Long>> {
+
+ private final String replPolicy;
+ private final List<Long> txnIds;
+
+ public GetTargetTxnIdListForPolicyHandler(String replPolicy, List<Long>
txnIds) {
+ this.replPolicy = replPolicy;
+ this.txnIds = txnIds;
+ }
+
+ @Override
+ public String getParameterizedQueryString(DatabaseProduct databaseProduct)
throws MetaException {
+ return "SELECT \"RTM_TARGET_TXN_ID\" FROM \"REPL_TXN_MAP\" " +
+ "WHERE \"RTM_TARGET_TXN_ID\" IN (:txnIds) AND
\"RTM_REPL_POLICY\" = :replPolicy";
+ }
+
+ @Override
+ public SqlParameterSource getQueryParameters() {
+ return new MapSqlParameterSource()
+ .addValue("txnIds", txnIds, Types.BIGINT)
+ .addValue("replPolicy", replPolicy);
+ }
+
+ @Override
+ public List<Long> extractData(ResultSet rs) throws SQLException,
DataAccessException {
+ List<Long> targetTxnIdList = new ArrayList<>();
+ while (rs.next()) {
+ targetTxnIdList.add(rs.getLong(1));
+ }
+ return targetTxnIdList;
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
index 6a9d2893f2f..b719d338f7f 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hive.metastore.client;
+import static org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.cleanDb;
+import static
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.getConnection;
+import static org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.prepDb;
+import static
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.queryToString;
+import static
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.setConfValues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
@@ -25,8 +30,12 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,6 +47,7 @@
import javax.jdo.Query;
import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.ObjectStore;
import org.apache.hadoop.hive.metastore.ObjectStoreTestHook;
@@ -55,6 +65,7 @@
import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollRequest;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.minihms.AbstractMetaStoreService;
@@ -214,6 +225,55 @@ public void testDeleteNonExistent() throws Exception {
client.scheduledQueryMaintenance(r);
}
+ @Test
+ public void testDeleteWithOpenTxn() throws Exception {
+ if
(!Objects.equals(client.getConfigValue(String.valueOf(ConfVars.THRIFT_URIS),
""), "")) {
+ System.out.println("It is not possible to create open transaction from
here in Remote mode. So, skipping the test case");
+ return;
+ }
+ String testCaseNS = "delwithopentxn";
+ String replPolicy = "db100";
+ // insert
+ ScheduledQuery schq = createScheduledQuery3(createKey(replPolicy,
testCaseNS));
+ ScheduledQueryMaintenanceRequest r = new
ScheduledQueryMaintenanceRequest();
+ r.setType(ScheduledQueryMaintenanceRequestType.CREATE);
+ r.setScheduledQuery(schq);
+ client.scheduledQueryMaintenance(r);
+ // wait 2 sec to have the query execution
+ Thread.sleep(2000);
+ // invoke poll to create a dependent execution
+ ScheduledQueryPollRequest pollRequest = new
ScheduledQueryPollRequest(testCaseNS);
+ client.scheduledQueryPoll(pollRequest);
+ Configuration conf = metaStore.getConf();
+ cleanDb(conf);
+ setConfValues(conf);
+ prepDb(conf);
+ String query = "select \"DB_ID\" from \"DBS\" where \"NAME\" = '" +
"default" + "' and \"CTLG_NAME\" = '" + "hive" + "'";
+ String[] output = queryToString(conf, query).split("\n");
+ if (output.length == 1) {
+ query = "INSERT INTO \"DBS\"(\"DB_ID\", \"NAME\", \"CTLG_NAME\",
\"DB_LOCATION_URI\") VALUES (1, '" + "default" + "','" + "hive" + "','dummy')";
+ try (Statement stmt = getConnection(conf).createStatement()) {
+ stmt.executeUpdate(query);
+ }
+ }
+
+ query = "INSERT INTO \"DBS\"(\"DB_ID\", \"NAME\", \"CTLG_NAME\",
\"DB_LOCATION_URI\") VALUES (2, '" + replPolicy + "','" + "hive" +
"','dummy')";
+ try (Statement stmt = getConnection(conf).createStatement()) {
+ stmt.executeUpdate(query);
+ }
+ List<Long> openTxnIds = new ArrayList<>(Arrays.asList(1L, 2L));
+ client.replOpenTxn(replPolicy + ".*", openTxnIds, "hive",
TxnType.REPL_CREATED);
+ String[] replTxnMapOutput = queryToString(conf, "SELECT \"RTM_SRC_TXN_ID\"
FROM \"REPL_TXN_MAP\"").split("\n");
+ assertEquals(3, replTxnMapOutput.length);
+ // delete scheduled query
+ r.setType(ScheduledQueryMaintenanceRequestType.DROP);
+ client.scheduledQueryMaintenance(r);
+ // validate repl_txn_map table
+ replTxnMapOutput = queryToString(conf, "SELECT \"RTM_SRC_TXN_ID\" FROM
\"REPL_TXN_MAP\"").split("\n");
+ assertEquals(1, replTxnMapOutput.length);
+ cleanDb(conf);
+ }
+
@Test
public void testExclusivePoll() throws Exception {
try {
@@ -645,4 +705,13 @@ private ScheduledQuery
createScheduledQuery2(ScheduledQueryKey key) {
return schq;
}
+ private ScheduledQuery createScheduledQuery3(ScheduledQueryKey key) {
+ ScheduledQuery schq = new ScheduledQuery();
+ schq.setScheduleKey(key);
+ schq.setEnabled(true);
+ schq.setSchedule("* * * * * ? *");
+ schq.setUser("user");
+ schq.setQuery("REPL LOAD db100 INTO db100");
+ return schq;
+ }
}