lokeshj1703 commented on code in PR #18132:
URL: https://github.com/apache/hudi/pull/18132#discussion_r3051288419


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -637,6 +627,510 @@ public void testSchemaEvolution(String tableType, boolean 
useUserProvidedSchema,
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
   }
 
+  @Test
+  public void testTimestampMillis() throws Exception {
+    String tableBasePath = basePath + "/testTimestampMillis";
+    defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+    // Insert data produced with Schema A, pass Schema A
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+        PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(cfg.targetBasePath)
+        .setConf(new Configuration()).build();
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    assertEquals(1000, 
sparkSession.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
+
+    cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+        PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1450, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    sqlContext.clearCache();
+    assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
+    assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 < '2080-01-01'").count());
+    assertEquals(0, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 < '1980-01-01'").count());
+  }
+
+  @Test
+  public void testLogicalTypes() throws Exception {
+    try {
+      String tableBasePath = basePath + "/testLogicalTypes";
+      defaultSchemaProviderClassName = 
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
+
+      if (HoodieSparkUtils.isSpark3_3()) {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        AbstractBaseTestSource.avroSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+      } else {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+        AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA;
+        AbstractBaseTestSource.avroSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+      }
+
+      // Insert data produced with Schema A, pass Schema A
+      HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+          PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+      cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+      cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+      cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      assertRecordCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(cfg.targetBasePath)
+          .setConf(new Configuration()).build();
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      Map<String, String> hudiOpts = new HashMap<>();
+      hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
+
+      cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+          PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+      cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+      cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+      cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      assertRecordCount(1450, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+      tableSchemaResolver = new TableSchemaResolver(metaClient);
+      tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
+    } finally {
+      defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+      AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+      AbstractBaseTestSource.avroSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+    }
+  }
+
+  private void logicalAssertions(Schema tableSchema, String tableBasePath, 
Map<String, String> hudiOpts, int tableVersion) {
+    assertEquals("timestamp-micros", 
tableSchema.getField("ts_micros").schema().getLogicalType().getName());
+    assertEquals("date", 
tableSchema.getField("event_date").schema().getLogicalType().getName());
+
+    sqlContext.clearCache();
+    Dataset<Row> df = sqlContext.read()
+        .options(hudiOpts)
+        .format("org.apache.hudi")
+        .load(tableBasePath);
+
+    long totalCount = df.count();
+    long expectedHalf = totalCount / 2;
+    long tolerance = totalCount / 20;
+    if (totalCount < 100) {
+      tolerance = totalCount / 4;
+    }
+
+    assertHalfSplit(df, "ts_micros > timestamp('2020-06-01 12:00:00Z')", 
expectedHalf, tolerance, "ts_micros > threshold");
+    assertHalfSplit(df, "ts_micros < timestamp('2020-06-01 12:00:00Z')", 
expectedHalf, tolerance, "ts_micros < threshold");
+    assertBoundaryCounts(df, "ts_micros > timestamp('2020-06-01 
12:00:00.000001Z')", "ts_micros <= timestamp('2020-06-01 12:00:00.000001Z')", 
totalCount);
+    assertBoundaryCounts(df, "ts_micros < timestamp('2020-06-01 
11:59:59.999999Z')", "ts_micros >= timestamp('2020-06-01 11:59:59.999999Z')", 
totalCount);
+
+    if (!HoodieSparkUtils.isSpark3_3()) {
+      assertHalfSplit(df, "local_ts_millis > CAST('2015-05-20 12:34:56' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_millis > threshold");
+      assertHalfSplit(df, "local_ts_millis < CAST('2015-05-20 12:34:56' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_millis < threshold");
+      assertBoundaryCounts(df, "local_ts_millis > CAST('2015-05-20 
12:34:56.001' AS TIMESTAMP_NTZ)", "local_ts_millis <= CAST('2015-05-20 
12:34:56.001' AS TIMESTAMP_NTZ)", totalCount);
+      assertBoundaryCounts(df, "local_ts_millis < CAST('2015-05-20 
12:34:55.999' AS TIMESTAMP_NTZ)", "local_ts_millis >= CAST('2015-05-20 
12:34:55.999' AS TIMESTAMP_NTZ)", totalCount);
+
+      assertHalfSplit(df, "local_ts_micros > CAST('2017-07-07 07:07:07' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_micros > threshold");
+      assertHalfSplit(df, "local_ts_micros < CAST('2017-07-07 07:07:07' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_micros < threshold");
+      assertBoundaryCounts(df, "local_ts_micros > CAST('2017-07-07 
07:07:07.000001' AS TIMESTAMP_NTZ)", "local_ts_micros <= CAST('2017-07-07 
07:07:07.000001' AS TIMESTAMP_NTZ)", totalCount);
+      assertBoundaryCounts(df, "local_ts_micros < CAST('2017-07-07 
07:07:06.999999' AS TIMESTAMP_NTZ)", "local_ts_micros >= CAST('2017-07-07 
07:07:06.999999' AS TIMESTAMP_NTZ)", totalCount);
+    }
+
+    assertHalfSplit(df, "event_date > date('2000-01-01')", expectedHalf, 
tolerance, "event_date > threshold");
+    assertHalfSplit(df, "event_date < date('2000-01-01')", expectedHalf, 
tolerance, "event_date < threshold");
+    assertBoundaryCounts(df, "event_date > date('2000-01-02')", "event_date <= 
date('2000-01-02')", totalCount);
+    assertBoundaryCounts(df, "event_date < date('1999-12-31')", "event_date >= 
date('1999-12-31')", totalCount);
+  }
+
+  private void assertHalfSplit(Dataset<Row> df, String filterExpr, long 
expectedHalf, long tolerance, String msg) {
+    long count = df.filter(filterExpr).count();
+    assertTrue(Math.abs(count - expectedHalf) <= tolerance, msg + " (got=" + 
count + ", expected=" + expectedHalf + ")");
+  }
+
+  private void assertBoundaryCounts(Dataset<Row> df, String exprZero, String 
exprTotal, long totalCount) {
+    assertEquals(0, df.filter(exprZero).count(), exprZero);
+    assertEquals(totalCount, df.filter(exprTotal).count(), exprTotal);
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", 
"CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"})
+  void testCOWLogicalRepair(String tableVersion, String recordType, String 
operation) throws Throwable {
+    timestampNTZCompatibility(() -> {
+      try {
+        String dirName = "trips_logical_types_json_cow_write";
+        String dataPath = basePath + "/" + dirName;
+        java.nio.file.Path zipOutput = Paths.get(new URI(dataPath));
+        HoodieTestUtils.extractZipToDirectory("logical-repair/" + dirName + 
".zip", zipOutput, getClass());
+        String tableBasePath = zipOutput.toString();
+
+        TypedProperties properties = new TypedProperties();
+        String schemaPath = 
getClass().getClassLoader().getResource("logical-repair/schema.avsc").toURI().toString();
+        
properties.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
schemaPath);
+        
properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file", 
schemaPath);
+        String inputDataPath = 
getClass().getClassLoader().getResource("logical-repair/cow_write_updates/2").toURI().toString();
+        properties.setProperty("hoodie.streamer.source.dfs.root", 
inputDataPath);
+        properties.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+        properties.setProperty("hoodie.datasource.write.precombine.field", 
"timestamp");
+        properties.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
+        properties.setProperty("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.SimpleKeyGenerator");
+        properties.setProperty("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
+        properties.setProperty("hoodie.compact.inline", "false");
+        properties.setProperty("hoodie.metatata.enable", "true");
+        properties.setProperty("hoodie.parquet.small.file.limit", "-1");
+        properties.setProperty("hoodie.cleaner.commits.retained", "10");
+        Option<TypedProperties> propt = Option.of(properties);
+
+        new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "456"), 
jsc, propt).sync();
+        // Ensure files in batch 3 have a newer mtime than batch 2, since 
DFSSource uses mtime as checkpoint.
+        // Git does not preserve file modification times on checkout, so both 
files would otherwise have the same mtime.
+        File data3 = new 
File(getClass().getClassLoader().getResource("logical-repair/cow_write_updates/3/data.json").toURI());
+        File data2 = new 
File(getClass().getClassLoader().getResource("logical-repair/cow_write_updates/2/data.json").toURI());
+        data3.setLastModified(data2.lastModified() + 1000);
+        inputDataPath = 
getClass().getClassLoader().getResource("logical-repair/cow_write_updates/3").toURI().toString();
+        propt.get().setProperty("hoodie.streamer.source.dfs.root", 
inputDataPath);
+        if ("CLUSTER".equals(operation)) {
+          propt.get().setProperty("hoodie.clustering.inline", "true");
+          propt.get().setProperty("hoodie.clustering.inline.max.commits", "1");
+          
propt.get().setProperty("hoodie.clustering.plan.strategy.single.group.clustering.enabled",
 "true");
+          
propt.get().setProperty("hoodie.clustering.plan.strategy.sort.columns", 
"ts_millis,_row_key");
+        }
+        new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "789"), 
jsc, propt).sync();
+        String prevTimezone = 
sparkSession.conf().get("spark.sql.session.timeZone");
+        try {
+          sparkSession.conf().set("spark.sql.session.timeZone", "UTC");
+          sparkSession.conf().set("spark.sql.parquet.enableVectorizedReader", 
"false");
+          Dataset<Row> df = 
sparkSession.read().format("hudi").load(tableBasePath);
+          assertDataframe(df, 15, 15, true);
+
+          if ("CLUSTER".equals(operation)) {
+            // after we cluster, the raw parquet should be correct
+
+            // Validate raw parquet files
+            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+                .setConf(hadoopConf)
+                .setBasePath(tableBasePath)
+                .build();
+
+            HoodieTimeline completedCommitsTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+            Option<HoodieInstant> latestInstant = 
completedCommitsTimeline.lastInstant();
+            assertTrue(latestInstant.isPresent(), "No completed commits 
found");
+
+            List<String> baseFilePaths = 
collectLatestBaseFilePaths(metaClient);
+
+            assertEquals(4, baseFilePaths.size());
+
+            // Read raw parquet files
+            Dataset<Row> rawParquetDf = 
sparkSession.read().parquet(baseFilePaths.toArray(new String[0]));
+            assertDataframe(rawParquetDf, 15, 15, false);
+          }
+        } finally {
+          sparkSession.conf().set("spark.sql.session.timeZone", prevTimezone);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = {
+      "SIX,AVRO,CLUSTER,AVRO",

Review Comment:
   Addressed



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -637,6 +627,510 @@ public void testSchemaEvolution(String tableType, boolean 
useUserProvidedSchema,
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
   }
 
+  @Test
+  public void testTimestampMillis() throws Exception {
+    String tableBasePath = basePath + "/testTimestampMillis";
+    defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+    // Insert data produced with Schema A, pass Schema A
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+        PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(cfg.targetBasePath)
+        .setConf(new Configuration()).build();
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    assertEquals(1000, 
sparkSession.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
+
+    cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+        PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source-timestamp-millis.avsc");
+    cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+    cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1450, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    sqlContext.clearCache();
+    assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
+    assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 < '2080-01-01'").count());
+    assertEquals(0, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 < '1980-01-01'").count());
+  }
+
+  @Test
+  public void testLogicalTypes() throws Exception {
+    try {
+      String tableBasePath = basePath + "/testLogicalTypes";
+      defaultSchemaProviderClassName = 
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.class.getName();
+
+      if (HoodieSparkUtils.isSpark3_3()) {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+        AbstractBaseTestSource.avroSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA_NO_LTS;
+      } else {
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+        
TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.targetSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+        AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_LOGICAL_TYPES_SCHEMA;
+        AbstractBaseTestSource.avroSchema = 
HoodieTestDataGenerator.AVRO_TRIP_LOGICAL_TYPES_SCHEMA;
+      }
+
+      // Insert data produced with Schema A, pass Schema A
+      HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+          PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+      cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+      cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+      cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      assertRecordCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(cfg.targetBasePath)
+          .setConf(new Configuration()).build();
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      Map<String, String> hudiOpts = new HashMap<>();
+      hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
+
+      cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
+          PROPS_FILENAME_TEST_SOURCE, false, true, false, null, 
HoodieTableType.MERGE_ON_READ.name());
+      cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+      cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
+      cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
+
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+      assertRecordCount(1450, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+      tableSchemaResolver = new TableSchemaResolver(metaClient);
+      tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
+    } finally {
+      defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
+      AbstractBaseTestSource.schemaStr = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+      AbstractBaseTestSource.avroSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+    }
+  }
+
+  private void logicalAssertions(Schema tableSchema, String tableBasePath, 
Map<String, String> hudiOpts, int tableVersion) {
+    assertEquals("timestamp-micros", 
tableSchema.getField("ts_micros").schema().getLogicalType().getName());
+    assertEquals("date", 
tableSchema.getField("event_date").schema().getLogicalType().getName());
+
+    sqlContext.clearCache();
+    Dataset<Row> df = sqlContext.read()
+        .options(hudiOpts)
+        .format("org.apache.hudi")
+        .load(tableBasePath);
+
+    long totalCount = df.count();
+    long expectedHalf = totalCount / 2;
+    long tolerance = totalCount / 20;
+    if (totalCount < 100) {
+      tolerance = totalCount / 4;
+    }
+
+    assertHalfSplit(df, "ts_micros > timestamp('2020-06-01 12:00:00Z')", 
expectedHalf, tolerance, "ts_micros > threshold");
+    assertHalfSplit(df, "ts_micros < timestamp('2020-06-01 12:00:00Z')", 
expectedHalf, tolerance, "ts_micros < threshold");
+    assertBoundaryCounts(df, "ts_micros > timestamp('2020-06-01 
12:00:00.000001Z')", "ts_micros <= timestamp('2020-06-01 12:00:00.000001Z')", 
totalCount);
+    assertBoundaryCounts(df, "ts_micros < timestamp('2020-06-01 
11:59:59.999999Z')", "ts_micros >= timestamp('2020-06-01 11:59:59.999999Z')", 
totalCount);
+
+    if (!HoodieSparkUtils.isSpark3_3()) {
+      assertHalfSplit(df, "local_ts_millis > CAST('2015-05-20 12:34:56' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_millis > threshold");
+      assertHalfSplit(df, "local_ts_millis < CAST('2015-05-20 12:34:56' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_millis < threshold");
+      assertBoundaryCounts(df, "local_ts_millis > CAST('2015-05-20 
12:34:56.001' AS TIMESTAMP_NTZ)", "local_ts_millis <= CAST('2015-05-20 
12:34:56.001' AS TIMESTAMP_NTZ)", totalCount);
+      assertBoundaryCounts(df, "local_ts_millis < CAST('2015-05-20 
12:34:55.999' AS TIMESTAMP_NTZ)", "local_ts_millis >= CAST('2015-05-20 
12:34:55.999' AS TIMESTAMP_NTZ)", totalCount);
+
+      assertHalfSplit(df, "local_ts_micros > CAST('2017-07-07 07:07:07' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_micros > threshold");
+      assertHalfSplit(df, "local_ts_micros < CAST('2017-07-07 07:07:07' AS 
TIMESTAMP_NTZ)", expectedHalf, tolerance, "local_ts_micros < threshold");
+      assertBoundaryCounts(df, "local_ts_micros > CAST('2017-07-07 
07:07:07.000001' AS TIMESTAMP_NTZ)", "local_ts_micros <= CAST('2017-07-07 
07:07:07.000001' AS TIMESTAMP_NTZ)", totalCount);
+      assertBoundaryCounts(df, "local_ts_micros < CAST('2017-07-07 
07:07:06.999999' AS TIMESTAMP_NTZ)", "local_ts_micros >= CAST('2017-07-07 
07:07:06.999999' AS TIMESTAMP_NTZ)", totalCount);
+    }
+
+    assertHalfSplit(df, "event_date > date('2000-01-01')", expectedHalf, 
tolerance, "event_date > threshold");
+    assertHalfSplit(df, "event_date < date('2000-01-01')", expectedHalf, 
tolerance, "event_date < threshold");
+    assertBoundaryCounts(df, "event_date > date('2000-01-02')", "event_date <= 
date('2000-01-02')", totalCount);
+    assertBoundaryCounts(df, "event_date < date('1999-12-31')", "event_date >= 
date('1999-12-31')", totalCount);
+  }
+
+  private void assertHalfSplit(Dataset<Row> df, String filterExpr, long 
expectedHalf, long tolerance, String msg) {
+    long count = df.filter(filterExpr).count();
+    assertTrue(Math.abs(count - expectedHalf) <= tolerance, msg + " (got=" + 
count + ", expected=" + expectedHalf + ")");
+  }
+
+  private void assertBoundaryCounts(Dataset<Row> df, String exprZero, String 
exprTotal, long totalCount) {
+    assertEquals(0, df.filter(exprZero).count(), exprZero);
+    assertEquals(totalCount, df.filter(exprTotal).count(), exprTotal);
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = {"SIX,AVRO,CLUSTER", "CURRENT,AVRO,NONE", 
"CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"})

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to