deniskuzZ commented on PR #6159:
URL: https://github.com/apache/hive/pull/6159#issuecomment-3523121208

   patch example
   ````
   Subject: [PATCH] r
   ---
   Index: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
   --- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
  (revision c729ea19807c0c0ca6f1df4870fff49660e95a85)
   +++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
  (date 1762968755997)
   @@ -27,6 +27,7 @@
    import java.io.IOException;
    import java.net.InetAddress;
    import java.sql.Connection;
   +import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.SQLIntegrityConstraintViolationException;
    import java.sql.Statement;
   @@ -9171,19 +9172,10 @@
          int maxRetries = MetastoreConf.getIntVar(conf, 
ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
          long sleepInterval = MetastoreConf.getTimeVar(conf,
              ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, 
TimeUnit.MILLISECONDS);
   +      
   +      final String versionParamKey = "hive.metastore.table.version";
   +      
          Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> 
{
   -        Ref<Exception> exceptionRef = new Ref<>();
   -        String savePoint = "uts_" + 
ThreadLocalRandom.current().nextInt(10000) + "_" + System.nanoTime();
   -        setTransactionSavePoint(savePoint);
   -        executePlainSQL(
   -            sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM 
\"TBLS\" WHERE \"TBL_ID\" = " + mTable.getId()),
   -            exception -> {
   -              rollbackTransactionToSavePoint(savePoint);
   -              exceptionRef.t = exception;
   -            });
   -        if (exceptionRef.t != null) {
   -          throw new RetryingExecutor.RetryException(exceptionRef.t);
   -        }
            pm.refresh(mTable);
            Table table = convertToTable(mTable);
            List<String> colNames = new ArrayList<>();
   @@ -9230,7 +9222,29 @@
                oldt.setWriteId(writeId);
              }
            }
   +        // ✅ OPTIMISTIC LOCKING: Read current version, increment, and 
prepare for atomic check
   +        String currentVersionStr = 
table.getParameters().get(versionParamKey);
   +        long currentVersion = (currentVersionStr != null ? 
Long.parseLong(currentVersionStr) : 0L);
   +        long newVersion = currentVersion + 1;
   +        newParams.put(versionParamKey, String.valueOf(newVersion));
   +        
            oldt.setParameters(newParams);
   +        
   +        // ✅ SINGLE STATEMENT: Atomically increment version with conflict 
detection
   +        // This UPDATE will fail if another transaction changed the version
   +        int updCount = incrementTableVersionAtomic(mTable.getId(), 
versionParamKey, currentVersion, newVersion);
   +        
   +        if (updCount == 0) {
   +          // Concurrent modification detected - retry
   +          LOG.debug("Table {}.{} was modified by another transaction 
(version {} changed), retrying...", 
   +              dbname, name, currentVersion);
   +          throw new RetryingExecutor.RetryException(
   +              new MetaException("Optimistic lock failure - table version 
changed"));
   +        }
   +        
   +        LOG.debug("Successfully updated table {}.{} version: {} -> {}", 
   +            dbname, name, currentVersion, newVersion);
   +        
            return newParams;
          }).onRetry(e -> e instanceof RetryingExecutor.RetryException)
            
.commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, 
interval ->
   @@ -14183,4 +14197,113 @@
          }
        }
      }
   +
   +  /**
   +   * Atomically increment the table version with optimistic lock check in a 
SINGLE statement.
   +   * This combines read-check-update into one atomic operation using UPSERT 
semantics.
   +   * 
   +   * Strategy:
   +   * 1. Try UPDATE with version check (if current version matches 
expectedVersion)
   +   * 2. If UPDATE affects 0 rows, either:
   +   *    - Version mismatch (conflict) → return 0 to signal retry
   +   *    - First time (no version exists) → INSERT with newVersion → return 1
   +   * 
   +   * @param tblId the table ID
   +   * @param versionParamKey the parameter key for version
   +   * @param expectedVersion the expected current version (0 for first time)
   +   * @param newVersion the new version to set
   +   * @return 1 if successful, 0 if conflict detected
   +   */
   +  private int incrementTableVersionAtomic(long tblId, String 
versionParamKey, 
   +      long expectedVersion, long newVersion) throws MetaException {
   +    
   +    try {
   +      // First, try to UPDATE with optimistic lock check
   +      String updateSQL = "UPDATE \"TABLE_PARAMS\" " +
   +          "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
   +          "WHERE \"TBL_ID\" = " + tblId + 
   +          " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
   +          " AND \"PARAM_VALUE\" = '" + expectedVersion + "'";
   +      
   +      int updCount = executePlainSQLUpdate(updateSQL);
   +      
   +      if (updCount == 1) {
   +        // Success - version was incremented
   +        return 1;
   +      }
   +      
   +      // updCount == 0: Either version mismatch OR first time (no row 
exists)
   +      // Check if row exists with different version (conflict) or doesn't 
exist (first time)
   +      String checkSQL = "SELECT \"PARAM_VALUE\" FROM \"TABLE_PARAMS\" " +
   +          "WHERE \"TBL_ID\" = " + tblId + " AND \"PARAM_KEY\" = '" + 
versionParamKey + "'";
   +      
   +      String existingVersion = executePlainSQLQuery(checkSQL, rs -> {
   +        if (rs.next()) {
   +          return rs.getString(1);
   +        }
   +        return null;
   +      });
   +      
   +      if (existingVersion != null) {
   +        // Row exists but version doesn't match - CONFLICT!
   +        LOG.debug("Version conflict for tblId={}: expected={}, actual={}", 
   +            tblId, expectedVersion, existingVersion);
   +        return 0;
   +      }
   +      
   +      // Row doesn't exist - first time, INSERT it
   +      if (expectedVersion == 0) {
   +        String insertSQL = "INSERT INTO \"TABLE_PARAMS\" (\"TBL_ID\", 
\"PARAM_KEY\", \"PARAM_VALUE\") " +
   +            "VALUES (" + tblId + ", '" + versionParamKey + "', '" + 
newVersion + "')";
   +        
   +        try {
   +          int insertCount = executePlainSQLUpdate(insertSQL);
   +          return insertCount > 0 ? 1 : 0;
   +        } catch (Exception e) {
   +          // Duplicate key - another thread inserted it, this is a conflict
   +          LOG.debug("Concurrent insert detected for tblId={}", tblId);
   +          return 0;
   +        }
   +      }
   +      
   +      // Expected version was not 0 but row doesn't exist - inconsistent 
state
   +      LOG.warn("Inconsistent state: expected version {} but no row exists 
for tblId={}", 
   +          expectedVersion, tblId);
   +      return 0;
   +      
   +    } catch (Exception e) {
   +      LOG.error("Failed to increment table version for tblId={}", tblId, e);
   +      throw new MetaException("Failed to increment table version: " + 
e.getMessage());
   +    }
   +  }
   +
   +  /**
   +   * Helper method to execute a plain SQL update statement
   +   */
   +  private int executePlainSQLUpdate(String sql) throws Exception {
   +    Connection jdoConn = ((JDOConnection) 
pm.getDataStoreConnection()).getNativeConnection();
   +    try (Statement statement = jdoConn.createStatement()) {
   +      return statement.executeUpdate(sql);
   +    } finally {
   +      jdoConn.close();
   +    }
   +  }
   +
   +  /**
   +   * Helper method to execute a plain SQL query with a result extractor
   +   */
   +  private <T> T executePlainSQLQuery(String sql, ResultExtractor<T> 
extractor) throws Exception {
   +    Connection jdoConn = ((JDOConnection) 
pm.getDataStoreConnection()).getNativeConnection();
   +    try (Statement statement = jdoConn.createStatement();
   +         ResultSet rs = statement.executeQuery(sql)) {
   +      return extractor.extract(rs);
   +    } finally {
   +      jdoConn.close();
   +    }
   +  }
   +
   +  @FunctionalInterface
   +  private interface ResultExtractor<T> {
   +    T extract(ResultSet rs) throws Exception;
   +  }
    }
   ````


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to