yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1970983549
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
Review Comment:
Should we have a follow-up JIRA to remove all APIs operating on byte array?
Is this still used and do we need to further remove such usage?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws
IOException {
+ return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
}
public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+ return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
}
- public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRollbackMetadata.class);
+ public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRollbackMetadata.class);
}
- public static HoodieRestoreMetadata deserializeHoodieRestoreMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRestoreMetadata.class);
+ public static HoodieRestoreMetadata
deserializeHoodieRestoreMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRestoreMetadata.class);
}
- public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
+ public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(Option<InputStream> instantStream) throws
IOException {
+ return deserializeAvroMetadata(instantStream,
HoodieSavepointMetadata.class);
Review Comment:
nit: be consistent on the argument naming. I see three variants, `in`,
`inputStream`, and `instantStream`.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws
IOException {
+ return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
}
public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+ return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
}
- public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRollbackMetadata.class);
+ public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRollbackMetadata.class);
}
- public static HoodieRestoreMetadata deserializeHoodieRestoreMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRestoreMetadata.class);
+ public static HoodieRestoreMetadata
deserializeHoodieRestoreMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRestoreMetadata.class);
}
- public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
+ public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(Option<InputStream> instantStream) throws
IOException {
+ return deserializeAvroMetadata(instantStream,
HoodieSavepointMetadata.class);
}
- public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes,
HoodieRequestedReplaceMetadata.class);
+ public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRequestedReplaceMetadata.class);
}
- public static HoodieIndexPlan deserializeIndexPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieIndexPlan.class);
+ public static HoodieIndexPlan deserializeIndexPlan(Option<InputStream> in)
throws IOException {
+ return deserializeAvroMetadata(in, HoodieIndexPlan.class);
}
- public static HoodieCommitMetadata deserializeCommitMetadata(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCommitMetadata.class);
+ public static HoodieCommitMetadata
deserializeCommitMetadata(Option<InputStream> instantStream) throws IOException
{
+ return deserializeAvroMetadata(instantStream, HoodieCommitMetadata.class);
}
- public static HoodieReplaceCommitMetadata
deserializeReplaceCommitMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
+ public static HoodieReplaceCommitMetadata
deserializeReplaceCommitMetadata(Option<InputStream> instantStream) throws
IOException {
+ return deserializeAvroMetadata(instantStream,
HoodieReplaceCommitMetadata.class);
}
- public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
+ public static <T extends SpecificRecordBase> T
deserializeAvroMetadataLegacy(byte[] bytes, Class<T> clazz)
Review Comment:
Have a follow-up JIRA to clean this up.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws
IOException {
+ return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
}
public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+ return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
Review Comment:
It looks to me that `deserializeHoodieCleanMetadata(byte[] bytes)` is no
longer used in the code base.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws
IOException {
+ return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
}
public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+ return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
Review Comment:
Let's remove the APIs taking or returning the byte array `byte[]` that are
no longer used in this PR. Any refactoring and further removal can be in a
follow-up PR.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws
IOException {
+ return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
}
public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+ return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
}
- public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRollbackMetadata.class);
+ public static HoodieRollbackMetadata
deserializeHoodieRollbackMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRollbackMetadata.class);
}
- public static HoodieRestoreMetadata deserializeHoodieRestoreMetadata(byte[]
bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieRestoreMetadata.class);
+ public static HoodieRestoreMetadata
deserializeHoodieRestoreMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRestoreMetadata.class);
}
- public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
+ public static HoodieSavepointMetadata
deserializeHoodieSavepointMetadata(Option<InputStream> instantStream) throws
IOException {
+ return deserializeAvroMetadata(instantStream,
HoodieSavepointMetadata.class);
}
- public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes,
HoodieRequestedReplaceMetadata.class);
+ public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadata(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieRequestedReplaceMetadata.class);
}
- public static HoodieIndexPlan deserializeIndexPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieIndexPlan.class);
+ public static HoodieIndexPlan deserializeIndexPlan(Option<InputStream> in)
throws IOException {
+ return deserializeAvroMetadata(in, HoodieIndexPlan.class);
}
- public static HoodieCommitMetadata deserializeCommitMetadata(byte[] bytes)
throws IOException {
- return deserializeAvroMetadata(bytes, HoodieCommitMetadata.class);
+ public static HoodieCommitMetadata
deserializeCommitMetadata(Option<InputStream> instantStream) throws IOException
{
+ return deserializeAvroMetadata(instantStream, HoodieCommitMetadata.class);
}
- public static HoodieReplaceCommitMetadata
deserializeReplaceCommitMetadata(byte[] bytes) throws IOException {
- return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
+ public static HoodieReplaceCommitMetadata
deserializeReplaceCommitMetadata(Option<InputStream> instantStream) throws
IOException {
+ return deserializeAvroMetadata(instantStream,
HoodieReplaceCommitMetadata.class);
}
- public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
+ public static <T extends SpecificRecordBase> T
deserializeAvroMetadataLegacy(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
FileReader<T> fileReader = DataFileReader.openReader(new
SeekableByteArrayInput(bytes), reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize
metadata of type " + clazz);
return fileReader.next();
}
+
+ public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(Option<InputStream> instantStream, Class<T> clazz)
+ throws IOException {
+ DatumReader<T> reader = new SpecificDatumReader<>(clazz);
+ try (DataFileStream<T> fileReader = new
DataFileStream<>(instantStream.get(), reader)) {
Review Comment:
Should there be an assertion on `instantStream.isPresent` somewhere along
the caller chain? Or is it always present and should the argument type be
changed to `InputStream` from `Option<InputStream>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]