This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e892ce7212b HIVE-27481: Addendum: Fix post-refactor issues (Laszlo
Vegh, reviewed by Attila Turoczy, Denys Kuzmenko)
e892ce7212b is described below
commit e892ce7212b2f0c9e55092d89076b129c86c8172
Author: veghlaci05 <[email protected]>
AuthorDate: Thu Feb 1 11:02:15 2024 +0100
HIVE-27481: Addendum: Fix post-refactor issues (Laszlo Vegh, reviewed by
Attila Turoczy, Denys Kuzmenko)
Closes #5010
---
.../txn/compactor/TestMaterializedViewRebuild.java | 33 ++++++++++++++++++----
.../txn/jdbc/MultiDataSourceJdbcResource.java | 29 +++++++------------
.../txn/jdbc/functions/OnRenameFunction.java | 28 +++++++++---------
.../ReleaseMaterializationRebuildLocks.java | 18 ++++++------
.../jdbc/queries/LatestTxnIdInConflictHandler.java | 10 +++++--
5 files changed, 70 insertions(+), 48 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMaterializedViewRebuild.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMaterializedViewRebuild.java
index a0bf2608bfb..d38e6695cb4 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMaterializedViewRebuild.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMaterializedViewRebuild.java
@@ -17,20 +17,27 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
-import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.execSelectAndDumpData;
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriverSilently;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
public class TestMaterializedViewRebuild extends CompactorOnTezTest {
@@ -182,4 +189,18 @@ public class TestMaterializedViewRebuild extends
CompactorOnTezTest {
Assert.assertEquals(expected, actual);
}
+ @Test
+ public void testMaterializationLockCleaned() throws Exception {
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ OpenTxnsResponse response = txnHandler.openTxns(new OpenTxnRequest(1,
"user", "host"));
+ txnHandler.lockMaterializationRebuild("default", TABLE1,
response.getTxn_ids().get(0));
+
+ //Mimic the lock can be cleaned up
+ ValidTxnList validTxnList = Mockito.mock(ValidReadTxnList.class);
+ when(validTxnList.isTxnValid(anyLong())).thenReturn(true);
+
+ long removedCnt =
txnHandler.cleanupMaterializationRebuildLocks(validTxnList, 10);
+ Assert.assertEquals(1, removedCnt);
+ }
+
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/MultiDataSourceJdbcResource.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/MultiDataSourceJdbcResource.java
index 101172c7407..7ab42c1336d 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/MultiDataSourceJdbcResource.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/MultiDataSourceJdbcResource.java
@@ -173,9 +173,9 @@ public class MultiDataSourceJdbcResource {
* @throws MetaException Forwarded from {@link
ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)} or
* thrown if the update count was rejected by the
{@link ParameterizedCommand#resultPolicy()} method
*/
- public Integer execute(ParameterizedCommand command) throws MetaException {
+ public int execute(ParameterizedCommand command) throws MetaException {
if (!shouldExecute(command)) {
- return null;
+ return -1;
}
try {
return
execute(command.getParameterizedQueryString(getDatabaseProduct()),
@@ -191,32 +191,23 @@ public class MultiDataSourceJdbcResource {
* call using the query string obtained from {@link
ParameterizedBatchCommand#getParameterizedQueryString(DatabaseProduct)},
* the parameters obtained from {@link
ParameterizedBatchCommand#getQueryParameters()}, and the
* {@link org.springframework.jdbc.core.PreparedStatementSetter} obtained
from
- * {@link ParameterizedBatchCommand#getPreparedStatementSetter()} methods.
The batchSize is coming fomr the
- * {@link Configuration} object. After the execution, this method validates
the resulted number of affected rows using the
- * {@link ParameterizedBatchCommand#resultPolicy()} function for each
element in the batch.
+ * {@link ParameterizedBatchCommand#getPreparedStatementSetter()} methods.
The batchSize is coming from the
+ * {@link Configuration} object.
*
* @param command The {@link ParameterizedBatchCommand} to execute.
- * @return Returns an integer array,containing the number of affected rows
for each element in the batch.
*/
- public <T> int execute(ParameterizedBatchCommand<T> command) throws
MetaException {
+ public <T> int[][] execute(ParameterizedBatchCommand<T> command) throws
MetaException {
if (!shouldExecute(command)) {
- return 0;
+ return null;
}
try {
int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
- int[][] result = getJdbcTemplate().getJdbcTemplate().batchUpdate(
+ return getJdbcTemplate().getJdbcTemplate().batchUpdate(
command.getParameterizedQueryString(databaseProduct),
command.getQueryParameters(),
maxBatchSize,
command.getPreparedStatementSetter()
- );
-
- Function<Integer, Boolean> resultPolicy = command.resultPolicy();
- if (resultPolicy != null && !Arrays.stream(result).allMatch(inner ->
Arrays.stream(inner).allMatch(resultPolicy::apply))) {
- LOG.error("The update count was rejected in at least one of the result
array. Rolling back.");
- throw new MetaException("The update count was rejected in at least one
of the result array. Rolling back.");
- }
- return Arrays.stream(result).reduce(0, (acc, i) -> acc +
Arrays.stream(i).sum(), Integer::sum);
+ );
} catch (Exception e) {
handleError(command, e);
throw e;
@@ -300,7 +291,7 @@ public class MultiDataSourceJdbcResource {
* @throws MetaException Forwarded from {@link
ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)} or
* thrown if the update count was rejected by the
{@link ParameterizedCommand#resultPolicy()} method
*/
- public Integer execute(String query, SqlParameterSource params,
+ public int execute(String query, SqlParameterSource params,
Function<Integer, Boolean> resultPolicy) throws
MetaException {
LOG.debug("Going to execute command <{}>", query);
int count = getJdbcTemplate().update(query, params);
@@ -322,7 +313,7 @@ public class MultiDataSourceJdbcResource {
* @return Returns with the object(s) constructed from the result of the
executed query.
* @throws MetaException Forwarded from {@link
ParameterizedCommand#getParameterizedQueryString(DatabaseProduct)}.
*/
- public <Result> Result execute(QueryHandler<Result> queryHandler) throws
MetaException {
+ public <T> T execute(QueryHandler<T> queryHandler) throws MetaException {
String queryStr =
queryHandler.getParameterizedQueryString(getDatabaseProduct());
LOG.debug("Going to execute query <{}>", queryStr);
SqlParameterSource params = queryHandler.getQueryParameters();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/OnRenameFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/OnRenameFunction.java
index 69173857f43..1167ee4f42a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/OnRenameFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/OnRenameFunction.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import java.sql.Types;
+
public class OnRenameFunction implements TransactionalFunction<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(OnRenameFunction.class);
@@ -105,12 +107,12 @@ public class OnRenameFunction implements
TransactionalFunction<Void> {
public OnRenameFunction(String oldCatName, String oldDbName, String
oldTabName, String oldPartName,
String newCatName, String newDbName, String
newTabName, String newPartName) {
this.oldCatName = oldCatName;
- this.oldDbName = oldDbName;
- this.oldTabName = oldTabName;
+ this.oldDbName = StringUtils.lowerCase(oldDbName);
+ this.oldTabName = StringUtils.lowerCase(oldTabName);
this.oldPartName = oldPartName;
this.newCatName = newCatName;
- this.newDbName = newDbName;
- this.newTabName = newTabName;
+ this.newDbName = StringUtils.lowerCase(newDbName);
+ this.newTabName = StringUtils.lowerCase(newTabName);
this.newPartName = newPartName;
}
@@ -121,23 +123,23 @@ public class OnRenameFunction implements
TransactionalFunction<Void> {
oldCatName + "," + oldDbName + "," + oldTabName + "," + oldPartName +
"," +
newCatName + "," + newDbName + "," + newTabName + "," + newPartName +
")";
- if(newPartName != null) {
+ if (newPartName != null) {
assert oldPartName != null && oldTabName != null && oldDbName != null &&
oldCatName != null : callSig;
}
- if(newTabName != null) {
+ if (newTabName != null) {
assert oldTabName != null && oldDbName != null && oldCatName != null :
callSig;
}
- if(newDbName != null) {
+ if (newDbName != null) {
assert oldDbName != null && oldCatName != null : callSig;
}
MapSqlParameterSource paramSource = new MapSqlParameterSource()
- .addValue("oldDbName", StringUtils.lowerCase(oldDbName))
- .addValue("newDbName", StringUtils.lowerCase(newDbName))
- .addValue("oldTableName", StringUtils.lowerCase(oldTabName))
- .addValue("newTableName", StringUtils.lowerCase(newTabName))
- .addValue("oldPartName", oldPartName)
- .addValue("newPartName", newPartName);
+ .addValue("oldDbName", oldDbName, Types.VARCHAR)
+ .addValue("newDbName", newDbName, Types.VARCHAR)
+ .addValue("oldTableName", oldTabName, Types.VARCHAR)
+ .addValue("newTableName", newTabName, Types.VARCHAR)
+ .addValue("oldPartName", oldPartName, Types.VARCHAR)
+ .addValue("newPartName", newPartName, Types.VARCHAR);
try {
for (String command : UPDATE_COMMANNDS) {
jdbcResource.getJdbcTemplate().update(command, paramSource);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReleaseMaterializationRebuildLocks.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReleaseMaterializationRebuildLocks.java
index 0ddebae5c58..86f1af05e6a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReleaseMaterializationRebuildLocks.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReleaseMaterializationRebuildLocks.java
@@ -53,14 +53,16 @@ public class ReleaseMaterializationRebuildLocks implements
TransactionalFunction
LOG.debug("Going to execute query <{}>", selectQ);
jdbcResource.getJdbcTemplate().query(selectQ, rs -> {
- long lastHeartbeat = rs.getLong(2);
- if (lastHeartbeat < timeoutTime) {
- // The heartbeat has timeout, double check whether we can remove it
- long txnId = rs.getLong(1);
- if (validTxnList.isTxnValid(txnId) ||
validTxnList.isTxnAborted(txnId)) {
- // Txn was committed (but notification was not received) or it was
aborted.
- // Either case, we can clean it up
- txnIds.add(txnId);
+ if (rs.next()) {
+ long lastHeartbeat = rs.getLong(2);
+ if (lastHeartbeat < timeoutTime) {
+ // The heartbeat has timeout, double check whether we can remove it
+ long txnId = rs.getLong(1);
+ if (validTxnList.isTxnValid(txnId) ||
validTxnList.isTxnAborted(txnId)) {
+ // Txn was committed (but notification was not received) or it was
aborted.
+ // Either case, we can clean it up
+ txnIds.add(txnId);
+ }
}
}
return null;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
index 33c7719cc71..f63748ca966 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
-import com.sun.tools.javac.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.txn.entities.OperationType;
@@ -30,8 +30,14 @@ import
org.springframework.jdbc.core.namedparam.SqlParameterSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.List;
public class LatestTxnIdInConflictHandler implements QueryHandler<Long> {
+
+ private static final List<String> OPERATION_TYPES = ImmutableList.of(
+ OperationType.UPDATE.getSqlConst(),
+ OperationType.DELETE.getSqlConst()
+ );
private final long txnId;
@@ -67,7 +73,7 @@ public class LatestTxnIdInConflictHandler implements
QueryHandler<Long> {
public SqlParameterSource getQueryParameters() {
return new MapSqlParameterSource()
.addValue("txnId", txnId)
- .addValue("types", List.of(OperationType.UPDATE.getSqlConst(),
OperationType.DELETE.getSqlConst()), Types.CHAR)
+ .addValue("types", OPERATION_TYPES, Types.CHAR)
.addValue("wsType", OperationType.INSERT.getSqlConst(), Types.CHAR);
}