xx789633 commented on code in PR #1441:
URL: https://github.com/apache/fluss/pull/1441#discussion_r2293397453
##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java:
##########
@@ -46,4 +58,51 @@ public static Optional<Schema> getSchema(LanceConfig config)
{
return Optional.empty();
}
}
+
+ public static long appendFragments(LanceConfig config,
List<FragmentMetadata> fragments) {
+ FragmentOperation.Append appendOp = new
FragmentOperation.Append(fragments);
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
+ Dataset datasetWrite =
+ Dataset.commit(
+ allocator,
+ config.getDatasetUri(),
+ appendOp,
+ java.util.Optional.of(datasetRead.version()),
+ options.getStorageOptions());
+ long version = datasetWrite.version();
+ datasetWrite.close();
+ // Dataset.create returns version 1
+ return version - 1;
+ }
+ }
+
+ public static long commitAppend(
+ LanceConfig config, List<FragmentMetadata> fragments, Map<String,
String> properties) {
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+ Transaction transaction =
+ dataset.newTransactionBuilder()
+
.operation(Append.builder().fragments(fragments).build())
+ .transactionProperties(properties)
+ .build();
+ try (Dataset appendedDataset = transaction.commit()) {
Review Comment:
Ok, we just use the returned version from lance commit operation without
intervention.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]