zabetak commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841170965
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
Review Comment:
Moving this method creates a big and unnecessary diff. Bringing them back to
their original position will reduce significantly backport conflicts and make
the real changes easier to follow.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -581,23 +652,36 @@ private void
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
// DO NOT remove the transaction from the TXN table, the cleaner will
remove it when appropriate
queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " +
TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
+
if (txnType == TxnType.MATER_VIEW_REBUILD) {
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE
\"MRL_TXN_ID\" = " + txnid);
}
+
if (txnType == TxnType.SOFT_DELETE ||
MetaStoreServerUtils.isCompactionTxn(txnType)) {
- queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " +
commitId + ", \"CQ_COMMIT_TIME\" = " +
- getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE
\"CQ_TXN_ID\" = " + txnid);
+ // For soft-delete and compaction transactions, we need to set the
commit id
+ if (commitInfo.isEmpty() && !ConfVars.useMinHistoryWriteId()) {
+ // Take a global txn lock to synchronize the openTxn and commitTxn
operations
+ new AcquireTxnLockFunction(false).execute(jdbcResource);
+ }
+ queryBatch.add("""
+ UPDATE "COMPACTION_QUEUE" SET "CQ_NEXT_TXN_ID" = %s,
"CQ_COMMIT_TIME" = %s
+ WHERE "CQ_TXN_ID" = %d"""
+ .formatted(
+ defaultIfNull(commitInfo.commitId(), "(SELECT MAX(\"TXN_ID\")
FROM \"TXNS\")"),
Review Comment:
This default value for setting `CQ_NEXT_TXN_ID` seems like new code. Was it
there before? Why do we need it now?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
- String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
- "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
- "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
- "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\",
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
- "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
- "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
- //For partitioned table we always track writes at partition level
(never at table)
- //and for non partitioned - always at table level, thus the same table
should never
- //have entries with partition key and w/o
- "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL))
" +
- "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " +
//txns overlap; could replace ws_txnid
- // with txnid, though any decent DB should infer this
- "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has
rows we just inserted as
- // part of this commitTxn() op
- "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has
committed txns
- //U+U and U+D and D+D is a conflict and we don't currently track I in
WRITE_SET at all
- //it may seem like D+D should not be in conflict but consider 2
multi-stmt txns
- //where each does "delete X + insert X, where X is a row with the same
PK. This is
- //equivalent to an update of X but won't be in conflict unless D+D is
in conflict.
- //The same happens when Hive splits U=I+D early so it looks like 2
branches of a
- //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
- // un-serializable to allow concurrent deletes
- "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
- "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
- LOG.debug("Going to execute query: <{}>", writeConflictQuery);
- return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
- new MapSqlParameterSource()
- .addValue("txnId", txnid)
- .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
- .addValue("opDelete", OperationType.DELETE.getSqlConst()),
- (ResultSet rs) -> {
- if(rs.next()) {
- return new WriteSetInfo(rs.getLong("WS_TXNID"),
rs.getLong("CUR_WS_COMMIT_ID"),
- rs.getLong("WS_COMMIT_ID"), rs.getString("CUR_OP"),
rs.getString("COMMITTED_OP"),
- rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"),
rs.getString("WS_PARTITION"));
- } else {
- return null;
- }
- });
- }
-
- private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource
jdbcResource, long txnid, char isUpdateDelete) {
Review Comment:
Please restore the method to its original position for the same reasons
mentioned above.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -267,36 +216,201 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
* If RO < W, then there is no reads-from relationship.
* In replication flow we don't expect any write write conflict as it
should have been handled at source.
*/
- assert true;
+ return CommitInfo.empty();
+ }
+
+ String conflictSQLSuffix = String.format("""
+ FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND
"TC_OPERATION_TYPE" IN (%s, %s)
+ """, OperationType.UPDATE, OperationType.DELETE);
+ String writeSetInsertSql = """
+ INSERT INTO "WRITE_SET"
+ ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID",
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+ SELECT DISTINCT
+ "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+ :commitId,
+ "TC_OPERATION_TYPE"
+ """;
+
+ boolean isUpdateOrDelete =
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
+ jdbcResource.getSqlGenerator()
+ .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid),
+ ResultSet::next));
+
+ Long commitId = null;
+ long tempCommitId = 0L;
+
+ if (isUpdateOrDelete) {
+ tempCommitId = TxnUtils.generateTemporaryId();
+ //if here it means currently committing txn performed update/delete and
we should check WW conflict
+ /**
+ * "select distinct" is used below because
+ * 1. once we get to multi-statement txns, we only care to record that
something was updated once
+ * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried
by caller it may create
+ * duplicate entries in TXN_COMPONENTS
+ * but we want to add a PK on WRITE_SET which won't have unique rows w/o
this distinct
+ * even if it includes all of its columns
+ *
+ * First insert into write_set using a temporary commitID, which will be
updated in a separate call,
+ * see: {@link
#updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource, long, TxnType,
Long, long)}}.
+ * This should decrease the scope of the S4U lock on the next_txn_id
table.
+ */
+ Object undoWriteSetForCurrentTxn = context.createSavepoint();
+ jdbcResource.getJdbcTemplate().update(
+ writeSetInsertSql + (ConfVars.useMinHistoryLevel() ?
conflictSQLSuffix :
+ "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+ (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND
\"TC_OPERATION_TYPE\" <> :type")),
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("type", OperationType.COMPACT.getSqlConst())
+ .addValue("commitId", tempCommitId));
+
+ /**
+ * This S4U will mutex with other commitTxn() and openTxns().
+ * -1 below makes txn intervals look like [3,3] [4,4] if all txns are
serial
+ * Note: it's possible to have several txns have the same commit id.
Suppose 3 txns start
+ * at the same time and no new txns start until all 3 commit.
+ * We could've incremented the sequence for commitId as well but it
doesn't add anything functionally.
+ */
+ new AcquireTxnLockFunction(false).execute(jdbcResource);
+ commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+
+ if (!rqst.isExclWriteEnabled()) {
+ /**
+ * see if there are any overlapping txns that wrote the same element,
i.e. have a conflict
+ * Since entire commit operation is mutexed wrt other start/commit ops,
+ * committed.ws_commit_id <= current.ws_commit_id for all txns
+ * thus if committed.ws_commit_id < current.ws_txnid, transactions do
NOT overlap
+ * For example, [17,20] is committed, [6,80] is being committed right
now - these overlap
+ * [17,20] committed and [21,21] committing now - these do not overlap.
+ * [17,18] committed and [18,19] committing now - these overlap (here
18 started while 17 was still running)
+ */
+ WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
+ if (info != null) {
+ handleWriteConflict(jdbcResource, context,
undoWriteSetForCurrentTxn, info, txnid, commitId,
+ isReplayedReplTxn);
+ }
+ }
+ } else if (!ConfVars.useMinHistoryLevel()) {
+ jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("commitId", jdbcResource.execute(new
GetHighWaterMarkHandler())));
}
+ return new CommitInfo(tempCommitId, commitId, isUpdateOrDelete);
+ }
+
+ private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid)
+ throws MetaException {
+ String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
+ "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
+ "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
+ "\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\",
\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" " +
+ "FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
+ "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
+ //For partitioned table we always track writes at partition level
(never at table)
+ //and for non partitioned - always at table level, thus the same table
should never
+ //have entries with partition key and w/o
+ "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL))
" +
+ "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " +
//txns overlap; could replace ws_txnid
+ // with txnid, though any decent DB should infer this
+ "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has
rows we just inserted as
+ // part of this commitTxn() op
+ "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has
committed txns
+ //U+U and U+D and D+D is a conflict and we don't currently track I in
WRITE_SET at all
+ //it may seem like D+D should not be in conflict but consider 2
multi-stmt txns
+ //where each does "delete X + insert X, where X is a row with the same
PK. This is
+ //equivalent to an update of X but won't be in conflict unless D+D is
in conflict.
+ //The same happens when Hive splits U=I+D early so it looks like 2
branches of a
+ //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
+ // un-serializable to allow concurrent deletes
+ "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
+ "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
+ LOG.debug("Going to execute query: <{}>", writeConflictQuery);
+ return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
+ .addValue("opDelete", OperationType.DELETE.getSqlConst()),
+ rs -> rs.next()
+ ? new WriteSetInfo(
+ rs.getLong("WS_TXNID"), rs.getLong("WS_COMMIT_ID"),
+ rs.getString("CUR_OP"), rs.getString("COMMITTED_OP"),
+ rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"),
rs.getString("WS_PARTITION")
+ )
+ : null);
+ }
+
+ private void handleWriteConflict(MultiDataSourceJdbcResource jdbcResource,
TransactionContext context,
+ Object undoWriteSetForCurrentTxn, WriteSetInfo info, long txnid, Long
commitId, boolean isReplayedReplTxn)
+ throws MetaException, TxnAbortedException {
+ //found a conflict, so let's abort the txn
+ String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId()) + "," +
info.commitId() + "]";
+ StringBuilder resource = new
StringBuilder(info.database()).append("/").append(info.table());
+ if (info.partition() != null) {
+ resource.append('/').append(info.partition());
+ }
+ String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," +
commitId + "]" + " due to a write conflict on " + resource +
+ " committed by " + committedTxn + " " + info.currOpType() + "/" +
info.opType();
+ //remove WRITE_SET info for current txn since it's about to abort
+ context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
+ LOG.info(msg);
+ //TODO: should make abortTxns() write something into TXNS.TXN_META_INFO
about this
+ int count = new AbortTxnsFunction(Collections.singletonList(txnid), false,
false,
+ isReplayedReplTxn,
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource);
+ if (count != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ throw new TxnAbortedException(msg);
+ }
+
+ private void handleCompletedTxnComponents(MultiDataSourceJdbcResource
jdbcResource, long txnid, TxnType txnType,
+ char isUpdateDelete, boolean isReplayedReplTxn, long sourceTxnId) throws
MetaException {
if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn &&
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
+
} else if (isReplayedReplTxn) {
if (rqst.isSetWriteEventInfos() && !rqst.getWriteEventInfos().isEmpty())
{
- jdbcResource.execute(new InsertCompletedTxnComponentsCommand(txnid,
isUpdateDelete, rqst.getWriteEventInfos()));
+ jdbcResource.execute(
+ new InsertCompletedTxnComponentsCommand(txnid, isUpdateDelete,
rqst.getWriteEventInfos()));
}
- jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId,
rqst.getReplPolicy()));
+ jdbcResource.execute(
+ new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
}
- updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId,
tempCommitId);
+ }
- if (rqst.isSetKeyValue()) {
- updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
- }
+ private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource
jdbcResource, long txnid, char isUpdateDelete) {
+ // Move the record from txn_components into completed_txn_components so
that the compactor
+ // knows where to look to compact.
+ String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\",
\"CTC_DATABASE\", " +
+ "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\",
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
+ "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\",
:flag FROM \"TXN_COMPONENTS\" " +
+ "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
+ //we only track compactor activity in TXN_COMPONENTS to handle the case
where the
+ //compactor txn aborts - so don't bother copying it to
COMPLETED_TXN_COMPONENTS
+ LOG.debug("Going to execute insert <{}>", query);
+ int affectedRows = jdbcResource.getJdbcTemplate().update(query,
+ new MapSqlParameterSource()
+ .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
+ .addValue("txnid", txnid)
+ .addValue("type", OperationType.COMPACT.getSqlConst(),
Types.CHAR));
- if (!isHiveReplTxn) {
- createCommitNotificationEvent(jdbcResource, txnid , txnType,
txnWriteDetails);
+ if (affectedRows < 1) {
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
+ //also an IUD with DP that didn't match any rows.
+ LOG.info("Expected to move at least one record from txn_components to "
+ + "completed_txn_components when committing txn! {}",
JavaUtils.txnIdToString(txnid));
}
+ }
- LOG.debug("Going to commit");
-
- if (MetastoreConf.getBoolVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
- }
- return txnType;
+ private static char toUpdateDeleteFlag(boolean isUpdateOrDelete) {
+ return isUpdateOrDelete ? 'Y' : 'N';
Review Comment:
How about putting this logic inside `CommitInfo` class:
```java
public char opFlag() {
return isUpdateOrDelete ? 'Y' : 'N';
}
```
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
- String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
- "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
- "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
- "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\",
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
Review Comment:
Why do we remove `CUR_WS_COMMIT_ID`?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -94,169 +97,115 @@ public CommitTxnFunction(CommitTxnRequest rqst,
List<TransactionalMetaStoreEvent
@Override
public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws
MetaException, NoSuchTxnException, TxnAbortedException {
- char isUpdateDelete = 'N';
long txnid = rqst.getTxnid();
long sourceTxnId = -1;
- boolean isReplayedReplTxn =
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
- boolean isHiveReplTxn = rqst.isSetReplPolicy() &&
TxnType.DEFAULT.equals(rqst.getTxn_type());
+ boolean isReplayedReplTxn = TxnType.REPL_CREATED == rqst.getTxn_type();
+ boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT ==
rqst.getTxn_type();
//Find the write details for this transaction.
//Doing it here before the metadata tables are updated below.
- List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
-
- if (!isHiveReplTxn) {
- txnWriteDetails = jdbcResource.execute(new
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
+ List<TxnWriteDetails> txnWriteDetails = loadTxnWriteDetails(jdbcResource,
isHiveReplTxn, txnid);
- }
// Get the current TXN
TransactionContext context =
jdbcResource.getTransactionManager().getActiveTransaction();
- Long commitId = null;
if (rqst.isSetReplLastIdInfo()) {
updateReplId(jdbcResource, rqst.getReplLastIdInfo());
}
if (isReplayedReplTxn) {
assert (rqst.isSetReplPolicy());
+ txnid = resolveTargetTxnId(jdbcResource, rqst.getTxnid());
sourceTxnId = rqst.getTxnid();
- List<Long> targetTxnIds = jdbcResource.execute(new
TargetTxnIdListHandler(rqst.getReplPolicy(),
Collections.singletonList(sourceTxnId)));
- if (targetTxnIds.isEmpty()) {
- // Idempotent case where txn was already closed or commit txn event
received without
- // corresponding open txn event.
- LOG.info("Target txn id is missing for source txn id : {} and repl
policy {}", sourceTxnId,
- rqst.getReplPolicy());
- throw new RollbackException(null);
- }
- assert targetTxnIds.size() == 1;
- txnid = targetTxnIds.getFirst();
}
- /**
- * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures
that no other
- * operation can change this txn (such acquiring locks). While lock() and
commitTxn()
- * should not normally run concurrently (for same txn) but could due to
bugs in the client
- * which could then corrupt internal transaction manager state. Also
competes with abortTxn().
- */
- TxnType txnType = jdbcResource.execute(new
GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), txnid));
+ TxnType txnType = getOpenTxnTypeAndLock(jdbcResource, txnid,
isReplayedReplTxn);
if (txnType == null) {
- //if here, txn was not found (in expected state)
- TxnStatus actualTxnStatus = jdbcResource.execute(new
FindTxnStateHandler(txnid));
- if (actualTxnStatus == TxnStatus.COMMITTED) {
- if (isReplayedReplTxn) {
- // in case of replication, idempotent is taken care by getTargetTxnId
- LOG.warn("Invalid state COMMITTED for transactions started using
replication replay task");
- }
- /**
- * This makes the operation idempotent
- * (assume that this is most likely due to retry logic)
- */
- LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
- return null;
- }
- TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+ return null;
}
- String conflictSQLSuffix = String.format("""
- FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND
"TC_OPERATION_TYPE" IN (%s, %s)
- """, OperationType.UPDATE, OperationType.DELETE);
- long tempCommitId = TxnUtils.generateTemporaryId();
+ CommitInfo commitInfo = prepareWriteSetAndCheckConflicts(
+ jdbcResource, context, txnid, txnType, isReplayedReplTxn);
+ char isUpdateDelete = toUpdateDeleteFlag(commitInfo.isUpdateOrDelete());
- if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
- if (!ConfVars.useMinHistoryWriteId()) {
- new AcquireTxnLockFunction(false).execute(jdbcResource);
- }
- commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
Review Comment:
Was this lock and watermark logic moved somewhere or it was removed
completely?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
Review Comment:
In addition, since the SQL query is mostly intact avoid adding/removing
insignificant spaces/tabs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]