deniskuzZ commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841438634
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
- String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
- "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
- "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
- "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\",
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
Review Comment:
it's not used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]