nsivabalan commented on code in PR #17940:
URL: https://github.com/apache/hudi/pull/17940#discussion_r2760820893


##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestMetadataCommand.java:
##########
@@ -105,4 +109,136 @@ public void testMetadataDelete() throws Exception {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     assertTrue(metaClient.getTableConfig().getMetadataPartitions().isEmpty());
   }
+
+  @Test
+  public void testGetRecordIndexInfo() throws Exception {
+    HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName(tableName())
+        
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
+        .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload")
+        .setPartitionFields("partition_path")
+        .setRecordKeyFields("_row_key")
+        .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName())
+        .initTable(HoodieCLI.conf.newInstance(), tablePath);
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH});
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .withEnableGlobalRecordLevelIndex(true).build();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tablePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withMetadataConfig(metadataConfig)
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), 
config)) {
+      String newCommitTime = client.startCommit();
+      int numRecords = 10;
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+      JavaRDD<HoodieRecord> writeRecords = 
context().getJavaSparkContext().parallelize(records, 1);
+      List<WriteStatus> result = client.upsert(writeRecords, 
newCommitTime).collect();
+      client.commit(newCommitTime, jsc().parallelize(result));
+      Assertions.assertNoWriteErrors(result);
+
+      // Get a record key from the inserted records
+      String recordKey = records.get(0).getRecordKey();
+
+      // Connect to the table
+      new TableCommand().connect(tablePath, false, 0, 0, 0,
+          "WAIT_TO_ADJUST_SKEW", 200L, false);
+
+      // Verify record index is enabled in table config
+      HoodieTableMetaClient metaClient = createMetaClient(jsc(), tablePath);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX));
+
+      // Validate entries in the Global RLI.
+      validateRecordIndexOutput(recordKey, Option.empty(), newCommitTime, 
DEFAULT_FIRST_PARTITION_PATH);

Review Comment:
   can we also try looking up a non existant record key.



##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestMetadataCommand.java:
##########
@@ -105,4 +109,136 @@ public void testMetadataDelete() throws Exception {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     assertTrue(metaClient.getTableConfig().getMetadataPartitions().isEmpty());
   }
+
+  @Test
+  public void testGetRecordIndexInfo() throws Exception {
+    HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName(tableName())
+        
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
+        .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload")
+        .setPartitionFields("partition_path")
+        .setRecordKeyFields("_row_key")
+        .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName())
+        .initTable(HoodieCLI.conf.newInstance(), tablePath);
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH});
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .withEnableGlobalRecordLevelIndex(true).build();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tablePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withMetadataConfig(metadataConfig)
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), 
config)) {
+      String newCommitTime = client.startCommit();
+      int numRecords = 10;
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+      JavaRDD<HoodieRecord> writeRecords = 
context().getJavaSparkContext().parallelize(records, 1);
+      List<WriteStatus> result = client.upsert(writeRecords, 
newCommitTime).collect();
+      client.commit(newCommitTime, jsc().parallelize(result));
+      Assertions.assertNoWriteErrors(result);
+
+      // Get a record key from the inserted records
+      String recordKey = records.get(0).getRecordKey();
+
+      // Connect to the table
+      new TableCommand().connect(tablePath, false, 0, 0, 0,
+          "WAIT_TO_ADJUST_SKEW", 200L, false);
+
+      // Verify record index is enabled in table config
+      HoodieTableMetaClient metaClient = createMetaClient(jsc(), tablePath);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX));
+
+      // Validate entries in the Global RLI.
+      validateRecordIndexOutput(recordKey, Option.empty(), newCommitTime, 
DEFAULT_FIRST_PARTITION_PATH);
+    }
+  }
+
+  @Test
+  public void testGetRecordIndexInfoForNonGlobalRLI() throws Exception {

Review Comment:
   lets name the method `testGetRecordIndexInfoForPartitionedRLI`



##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestMetadataCommand.java:
##########
@@ -105,4 +109,136 @@ public void testMetadataDelete() throws Exception {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     assertTrue(metaClient.getTableConfig().getMetadataPartitions().isEmpty());
   }
+
+  @Test
+  public void testGetRecordIndexInfo() throws Exception {
+    HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName(tableName())
+        
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
+        .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload")
+        .setPartitionFields("partition_path")
+        .setRecordKeyFields("_row_key")
+        .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName())
+        .initTable(HoodieCLI.conf.newInstance(), tablePath);
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] 
{DEFAULT_FIRST_PARTITION_PATH});
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .withEnableGlobalRecordLevelIndex(true).build();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tablePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withMetadataConfig(metadataConfig)
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), 
config)) {
+      String newCommitTime = client.startCommit();
+      int numRecords = 10;
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+      JavaRDD<HoodieRecord> writeRecords = 
context().getJavaSparkContext().parallelize(records, 1);
+      List<WriteStatus> result = client.upsert(writeRecords, 
newCommitTime).collect();
+      client.commit(newCommitTime, jsc().parallelize(result));
+      Assertions.assertNoWriteErrors(result);
+
+      // Get a record key from the inserted records
+      String recordKey = records.get(0).getRecordKey();
+
+      // Connect to the table
+      new TableCommand().connect(tablePath, false, 0, 0, 0,
+          "WAIT_TO_ADJUST_SKEW", 200L, false);
+
+      // Verify record index is enabled in table config
+      HoodieTableMetaClient metaClient = createMetaClient(jsc(), tablePath);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX));
+
+      // Validate entries in the Global RLI.
+      validateRecordIndexOutput(recordKey, Option.empty(), newCommitTime, 
DEFAULT_FIRST_PARTITION_PATH);
+    }
+  }
+
+  @Test
+  public void testGetRecordIndexInfoForNonGlobalRLI() throws Exception {
+    HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName(tableName())
+        
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
+        .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload")
+        .setPartitionFields("partition_path")
+        .setRecordKeyFields("_row_key")
+        .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName())
+        .initTable(HoodieCLI.conf.newInstance(), tablePath);
+
+    HoodieTestDataGenerator firstDataGen = new HoodieTestDataGenerator(new 
String[] {DEFAULT_FIRST_PARTITION_PATH});
+    HoodieTestDataGenerator secondDataGen = new HoodieTestDataGenerator(new 
String[] {DEFAULT_SECOND_PARTITION_PATH});
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .withEnableRecordLevelIndex(true).build();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tablePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withMetadataConfig(metadataConfig)
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), 
config)) {
+
+      String firstCommitTime = client.startCommit();
+      int numRecords = 10;
+      List<HoodieRecord> records = 
firstDataGen.generateInserts(firstCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords = 
context().getJavaSparkContext().parallelize(records, 1);
+      List<WriteStatus> result = client.upsert(writeRecords, 
firstCommitTime).collect();
+      client.commit(firstCommitTime, jsc().parallelize(result));
+      Assertions.assertNoWriteErrors(result);
+
+      // Get a record key from the inserted records
+      String firstRecordKey = records.get(0).getRecordKey();
+      String firstPartitionPath = records.get(0).getPartitionPath();
+
+      String secondCommitTime = client.startCommit();
+      records = secondDataGen.generateInserts(secondCommitTime, numRecords);
+      writeRecords = context().getJavaSparkContext().parallelize(records, 1);
+      result = client.upsert(writeRecords, secondCommitTime).collect();
+      client.commit(secondCommitTime, jsc().parallelize(result));
+      Assertions.assertNoWriteErrors(result);
+
+      // Get a record key from the inserted records
+      String secondRecordKey = records.get(0).getRecordKey();
+      String secondPartitionPath = records.get(0).getPartitionPath();
+
+      // Connect to the table
+      new TableCommand().connect(tablePath, false, 0, 0, 0,
+          "WAIT_TO_ADJUST_SKEW", 200L, false);
+
+      // Verify record index is enabled in table config
+      HoodieTableMetaClient metaClient = createMetaClient(jsc(), tablePath);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX));
+
+      // Validate entries in the Non-Global RLI.
+      validateRecordIndexOutput(firstRecordKey, Option.of(firstPartitionPath), 
firstCommitTime, DEFAULT_FIRST_PARTITION_PATH);
+      validateRecordIndexOutput(secondRecordKey, 
Option.of(secondPartitionPath), secondCommitTime, 
DEFAULT_SECOND_PARTITION_PATH);
+    }

Review Comment:
   same comment as above. non existant entry and validate the output 



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java:
##########
@@ -387,6 +392,58 @@ public String validateFiles(
     return HoodiePrintHelper.print(header, new HashMap<>(), "", false, 
Integer.MAX_VALUE, false, rows);
   }
 
+  @ShellMethod(key = "metadata lookup-record-index", value = "Print Record 
index information for a record_key. "
+      + "For global RLI, only record key is required. For partitioned RLI, 
both record key and partition path are required.")
+  public String getRecordIndexInfo(
+      @ShellOption(value = "--record_key", help = "Record key entry whose info 
will be fetched")
+      final String recordKey,
+      @ShellOption(value = "--partition_path", help = "Partition path. 
Required for partitioned (non-global) Record Level Index.",
+          defaultValue = "") final String partitionPath) {
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    HoodieStorage storage = metaClient.getStorage();
+    HoodieMetadataConfig config = 
HoodieMetadataConfig.newBuilder().enable(true).build();
+    HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(HoodieCLI.conf), storage, config, 
HoodieCLI.basePath);
+
+    ValidationUtils.checkState(metaReader.enabled(), "[ERROR] Metadata Table 
not enabled/initialized\n\n");
+    
ValidationUtils.checkState(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+        "[ERROR] Record index partition is not enabled/initialized\n\n");
+
+    // Check if RLI is partitioned and validate partition_path is provided
+    if (config.isRecordLevelIndexEnabled() && 
!config.isGlobalRecordLevelIndexEnabled()) {
+      ValidationUtils.checkState(!StringUtils.isNullOrEmpty(partitionPath),
+          "[ERROR] Partitioned Record Level Index requires --partition_path to 
be provided\n\n");
+    }
+
+    Option<String> dataTablePartition = 
StringUtils.isNullOrEmpty(partitionPath) ? Option.empty() : 
Option.of(partitionPath);

Review Comment:
   minor. we can simplify as `Option.nullable(partitionPath)` 
   



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java:
##########
@@ -387,6 +392,58 @@ public String validateFiles(
     return HoodiePrintHelper.print(header, new HashMap<>(), "", false, 
Integer.MAX_VALUE, false, rows);
   }
 
+  @ShellMethod(key = "metadata lookup-record-index", value = "Print Record 
index information for a record_key. "
+      + "For global RLI, only record key is required. For partitioned RLI, 
both record key and partition path are required.")
+  public String getRecordIndexInfo(
+      @ShellOption(value = "--record_key", help = "Record key entry whose info 
will be fetched")
+      final String recordKey,
+      @ShellOption(value = "--partition_path", help = "Partition path. 
Required for partitioned (non-global) Record Level Index.",
+          defaultValue = "") final String partitionPath) {
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    HoodieStorage storage = metaClient.getStorage();
+    HoodieMetadataConfig config = 
HoodieMetadataConfig.newBuilder().enable(true).build();
+    HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(HoodieCLI.conf), storage, config, 
HoodieCLI.basePath);
+
+    ValidationUtils.checkState(metaReader.enabled(), "[ERROR] Metadata Table 
not enabled/initialized\n\n");
+    
ValidationUtils.checkState(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+        "[ERROR] Record index partition is not enabled/initialized\n\n");
+
+    // Check if RLI is partitioned and validate partition_path is provided
+    if (config.isRecordLevelIndexEnabled() && 
!config.isGlobalRecordLevelIndexEnabled()) {

Review Comment:
   sorry. for hudi cli, we may not have all writer properties. 
   so we cant really rely on `config.isGlobalRecordLevelIndexEnabled()` or we 
can't rely on `HoodieMetadataConfig` only. 
   
   thats why I suggested to look into index definition. which the reader will 
have access to. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to