the-other-tim-brown commented on code in PR #13628:
URL: https://github.com/apache/hudi/pull/13628#discussion_r2246177569
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java:
##########
@@ -113,4 +132,65 @@ public void commitToTable(List<HoodieRecord> recordList,
String operation, boole
}
}
}
+
+ @Override
+ public void commitSchemaToTable(InternalSchema schema, Map<String, String>
writeConfigs, String historySchemaStr) {
+ String tableName =
writeConfigs.get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+ Schema avroSchema = AvroInternalSchemaConverter.convert(schema,
getAvroRecordQualifiedName(tableName));
+
+ StorageConfiguration<?> storageConf = getStorageConf();
+ String basePath = getBasePath();
+
+ Map<String, String> finalWriteConfigs = new HashMap<>(writeConfigs);
+ finalWriteConfigs.put(HoodieCleanConfig.AUTO_CLEAN.key(), "false");
+ finalWriteConfigs.put(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ HoodieFailedWritesCleaningPolicy.NEVER.name());
+ finalWriteConfigs.put(HoodieArchivalConfig.AUTO_ARCHIVE.key(), "false");
+
+ HoodieJavaClientTestHarness.TestJavaTaskContextSupplier
taskContextSupplier = new
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
+ HoodieJavaEngineContext context = new
HoodieJavaEngineContext(getStorageConf(), taskContextSupplier);
+
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(avroSchema.toString())
+ .withEngineType(EngineType.JAVA)
+ .withProps(finalWriteConfigs)
+ .build();
+
+ try (HoodieJavaWriteClient<?> client = new
HoodieJavaWriteClient<>(context, config)) {
+
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf)
+ .setBasePath(basePath)
+ .setTimeGeneratorConfig(config.getTimeGeneratorConfig())
+ .build();
+
+ WriteOperationType operationType = WriteOperationType.ALTER_SCHEMA;
+ String commitActionType = CommitUtils.getCommitActionType(operationType,
metaClient.getTableType());
+
+ String instantTime = client.startCommit(commitActionType);
+ client.setOperationType(operationType);
+
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ TimelineLayout layout = metaClient.getTimelineLayout();
+ HoodieInstant requested = layout.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
instantTime);
+
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.setOperationType(operationType);
+
+ timeline.transitionRequestedToInflight(requested, Option.of(metadata));
+
+ Map<String, String> extraMeta = new HashMap<>();
+ long schemaId = Long.parseLong(instantTime);
+ InternalSchema withId = schema.setSchemaId(schemaId);
+ extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(withId));
Review Comment:
This is not a big deal here but generally it's good to get in the habit of
using `Collections.singletonMap` when making maps with a single element. It
helps cut down on the overhead of the map. This ends up being a big win when
dealing with anything happening many times like a per-row operation so it is
good to get in the habit of doing this.
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java:
##########
@@ -113,4 +132,65 @@ public void commitToTable(List<HoodieRecord> recordList,
String operation, boole
}
}
}
+
+ @Override
+ public void commitSchemaToTable(InternalSchema schema, Map<String, String>
writeConfigs, String historySchemaStr) {
+ String tableName =
writeConfigs.get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+ Schema avroSchema = AvroInternalSchemaConverter.convert(schema,
getAvroRecordQualifiedName(tableName));
+
+ StorageConfiguration<?> storageConf = getStorageConf();
+ String basePath = getBasePath();
+
+ Map<String, String> finalWriteConfigs = new HashMap<>(writeConfigs);
+ finalWriteConfigs.put(HoodieCleanConfig.AUTO_CLEAN.key(), "false");
+ finalWriteConfigs.put(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ HoodieFailedWritesCleaningPolicy.NEVER.name());
+ finalWriteConfigs.put(HoodieArchivalConfig.AUTO_ARCHIVE.key(), "false");
+
+ HoodieJavaClientTestHarness.TestJavaTaskContextSupplier
taskContextSupplier = new
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
+ HoodieJavaEngineContext context = new
HoodieJavaEngineContext(getStorageConf(), taskContextSupplier);
+
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(avroSchema.toString())
+ .withEngineType(EngineType.JAVA)
+ .withProps(finalWriteConfigs)
+ .build();
+
+ try (HoodieJavaWriteClient<?> client = new
HoodieJavaWriteClient<>(context, config)) {
+
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf)
+ .setBasePath(basePath)
+ .setTimeGeneratorConfig(config.getTimeGeneratorConfig())
+ .build();
+
+ WriteOperationType operationType = WriteOperationType.ALTER_SCHEMA;
+ String commitActionType = CommitUtils.getCommitActionType(operationType,
metaClient.getTableType());
+
+ String instantTime = client.startCommit(commitActionType);
+ client.setOperationType(operationType);
+
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ TimelineLayout layout = metaClient.getTimelineLayout();
+ HoodieInstant requested = layout.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
instantTime);
+
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.setOperationType(operationType);
+
+ timeline.transitionRequestedToInflight(requested, Option.of(metadata));
+
+ Map<String, String> extraMeta = new HashMap<>();
+ long schemaId = Long.parseLong(instantTime);
+ InternalSchema withId = schema.setSchemaId(schemaId);
+ extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(withId));
+
+ FileBasedInternalSchemaStorageManager schemaManager = new
FileBasedInternalSchemaStorageManager(metaClient);
+ schemaManager.persistHistorySchemaStr(instantTime,
+ SerDeHelper.inheritSchemas(schema, historySchemaStr));
+
+ assertTrue(client.commit(instantTime, new ArrayList<>(),
Option.of(extraMeta)));
Review Comment:
Similarly here, `Collections.emptyList` will return a constant while `new
ArrayList<>()` allocates a new array causing some overhead.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1379,6 +1379,88 @@ public static boolean hasListOrMapField(Schema schema) {
}
}
+ public static void validateRecordsHaveSameData(Object expected, Object
actual) {
+ validateRecordsHaveSameData(expected, actual, new LinkedList<>());
+ }
+
+ private static void validateRecordsHaveSameData(Object expected, Object
actual, Deque<String> fieldNames) {
+ if (expected instanceof GenericRecord) {
+ if (!(actual instanceof GenericRecord)) {
+ throw new HoodieAvroSchemaException("Expected record but got " +
actual.getClass().getName() + " for " + createFullName(fieldNames));
+ }
+ GenericRecord expectedRecord = (GenericRecord) expected;
+ GenericRecord actualRecord = (GenericRecord) actual;
+ // TODO: add this check when schema evolution has the more flexible
schema comparison
+ // if (!Objects.equals(expectedRecord.getSchema(),
actualRecord.getSchema())) {
+ // throw new HoodieAvroSchemaException("Expected record schema " +
expectedRecord.getSchema() + " but got " + actualRecord.getSchema() + " for " +
createFullName(fieldNames));
+ // }
+ for (Schema.Field field : expectedRecord.getSchema().getFields()) {
+ fieldNames.push(field.name());
+ validateRecordsHaveSameData(expectedRecord.get(field.name()),
actualRecord.get(field.name()), fieldNames);
+ fieldNames.pop();
+ }
+ } else if (expected instanceof Collection) {
+ if (!(actual instanceof Collection)) {
+ throw new HoodieAvroSchemaException("Expected collection but got " +
actual.getClass().getName());
+ }
+ Collection expectedCollection = (Collection) expected;
+ Collection actualCollection = (Collection) actual;
+ if (expectedCollection.size() != actualCollection.size()) {
+ throw new HoodieAvroSchemaException("Expected collection size " +
expectedCollection.size() + " but got " + actualCollection.size() + " for " +
createFullName(fieldNames));
+ }
+ Iterator<?> expectedIterator = expectedCollection.iterator();
+ Iterator<?> actualIterator = actualCollection.iterator();
+ fieldNames.push("element");
+ while (expectedIterator.hasNext() && actualIterator.hasNext()) {
+ validateRecordsHaveSameData(expectedIterator.next(),
actualIterator.next(), fieldNames);
+ }
+ fieldNames.pop();
+ } else if (expected instanceof Map) {
+ if (!(actual instanceof Map)) {
+ throw new HoodieAvroSchemaException("Expected map but got " +
actual.getClass().getName() + " for " + createFullName(fieldNames));
+ }
+ Map expectedMap = (Map) expected;
+ Map actualMap = (Map) actual;
+ if (expectedMap.size() != actualMap.size()) {
+ throw new HoodieAvroSchemaException("Expected map size " +
expectedMap.size() + " but got " + actualMap.size() + " for " +
createFullName(fieldNames));
+ }
+ if (!expectedMap.keySet().equals(actualMap.keySet())) {
+ Set<String> expectedKeys = (Set<String>)
expectedMap.keySet().stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> actualKeys = (Set<String>)
actualMap.keySet().stream().map(Object::toString).collect(Collectors.toSet());
+ if (!expectedKeys.equals(actualKeys)) {
+ throw new HoodieAvroSchemaException("Expected map keys " +
expectedMap.keySet() + " but got " + actualMap.keySet() + " for " +
createFullName(fieldNames));
+ } else {
+ Map<String, Object> realExpectedMap = (Map<String, Object>)
expectedMap.entrySet().stream()
Review Comment:
Why is this step necessary?
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java:
##########
@@ -113,4 +132,65 @@ public void commitToTable(List<HoodieRecord> recordList,
String operation, boole
}
}
}
+
+ @Override
+ public void commitSchemaToTable(InternalSchema schema, Map<String, String>
writeConfigs, String historySchemaStr) {
Review Comment:
Is there any reason we need each engine to implement this? Could we have
each implementation just rely on the java writer?
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1379,6 +1379,88 @@ public static boolean hasListOrMapField(Schema schema) {
}
}
+ public static void validateRecordsHaveSameData(Object expected, Object
actual) {
+ validateRecordsHaveSameData(expected, actual, new LinkedList<>());
+ }
+
+ private static void validateRecordsHaveSameData(Object expected, Object
actual, Deque<String> fieldNames) {
+ if (expected instanceof GenericRecord) {
+ if (!(actual instanceof GenericRecord)) {
+ throw new HoodieAvroSchemaException("Expected record but got " +
actual.getClass().getName() + " for " + createFullName(fieldNames));
+ }
+ GenericRecord expectedRecord = (GenericRecord) expected;
+ GenericRecord actualRecord = (GenericRecord) actual;
+ // TODO: add this check when schema evolution has the more flexible
schema comparison
+ // if (!Objects.equals(expectedRecord.getSchema(),
actualRecord.getSchema())) {
+ // throw new HoodieAvroSchemaException("Expected record schema " +
expectedRecord.getSchema() + " but got " + actualRecord.getSchema() + " for " +
createFullName(fieldNames));
+ // }
+ for (Schema.Field field : expectedRecord.getSchema().getFields()) {
+ fieldNames.push(field.name());
+ validateRecordsHaveSameData(expectedRecord.get(field.name()),
actualRecord.get(field.name()), fieldNames);
+ fieldNames.pop();
+ }
+ } else if (expected instanceof Collection) {
+ if (!(actual instanceof Collection)) {
+ throw new HoodieAvroSchemaException("Expected collection but got " +
actual.getClass().getName());
+ }
+ Collection expectedCollection = (Collection) expected;
+ Collection actualCollection = (Collection) actual;
+ if (expectedCollection.size() != actualCollection.size()) {
+ throw new HoodieAvroSchemaException("Expected collection size " +
expectedCollection.size() + " but got " + actualCollection.size() + " for " +
createFullName(fieldNames));
+ }
+ Iterator<?> expectedIterator = expectedCollection.iterator();
+ Iterator<?> actualIterator = actualCollection.iterator();
+ fieldNames.push("element");
+ while (expectedIterator.hasNext() && actualIterator.hasNext()) {
+ validateRecordsHaveSameData(expectedIterator.next(),
actualIterator.next(), fieldNames);
+ }
+ fieldNames.pop();
+ } else if (expected instanceof Map) {
+ if (!(actual instanceof Map)) {
+ throw new HoodieAvroSchemaException("Expected map but got " +
actual.getClass().getName() + " for " + createFullName(fieldNames));
+ }
+ Map expectedMap = (Map) expected;
+ Map actualMap = (Map) actual;
+ if (expectedMap.size() != actualMap.size()) {
+ throw new HoodieAvroSchemaException("Expected map size " +
expectedMap.size() + " but got " + actualMap.size() + " for " +
createFullName(fieldNames));
+ }
+ if (!expectedMap.keySet().equals(actualMap.keySet())) {
+ Set<String> expectedKeys = (Set<String>)
expectedMap.keySet().stream().map(Object::toString).collect(Collectors.toSet());
+ Set<String> actualKeys = (Set<String>)
actualMap.keySet().stream().map(Object::toString).collect(Collectors.toSet());
+ if (!expectedKeys.equals(actualKeys)) {
Review Comment:
Won't this always be true? The map keys in avro are limited to strings
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1379,6 +1379,88 @@ public static boolean hasListOrMapField(Schema schema) {
}
}
+ public static void validateRecordsHaveSameData(Object expected, Object
actual) {
Review Comment:
This is used by test code, can we move it to a test class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]