luoyuxia commented on code in PR #1441:
URL: https://github.com/apache/fluss/pull/1441#discussion_r2293247784
##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java:
##########
@@ -46,4 +58,51 @@ public static Optional<Schema> getSchema(LanceConfig config)
{
return Optional.empty();
}
}
+
+ public static long appendFragments(LanceConfig config,
List<FragmentMetadata> fragments) {
+ FragmentOperation.Append appendOp = new
FragmentOperation.Append(fragments);
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
+ Dataset datasetWrite =
+ Dataset.commit(
+ allocator,
+ config.getDatasetUri(),
+ appendOp,
+ java.util.Optional.of(datasetRead.version()),
+ options.getStorageOptions());
+ long version = datasetWrite.version();
+ datasetWrite.close();
+ // Dataset.create returns version 1
+ return version - 1;
+ }
+ }
+
+ public static long commitAppend(
+ LanceConfig config, List<FragmentMetadata> fragments, Map<String,
String> properties) {
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+ Transaction transaction =
+ dataset.newTransactionBuilder()
+
.operation(Append.builder().fragments(fragments).build())
+ .transactionProperties(properties)
+ .build();
+ try (Dataset appendedDataset = transaction.commit()) {
Review Comment:
force to `version -1` look stranges to me. What's the problem if use version
directly?
It'll be error-prone in union read for we have to remember to plus 1 to the
version got from Fluss's zk
##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java:
##########
@@ -187,4 +222,66 @@ protected ArrowType defaultMethod(DataType dataType) {
"Unsupported data type %s currently.",
dataType.asSummaryString()));
}
}
+
+ private static int getPrecision(DecimalVector decimalVector) {
+ int precision = -1;
+ try {
+ java.lang.reflect.Field precisionField =
+ decimalVector.getClass().getDeclaredField("precision");
+ precisionField.setAccessible(true);
+ precision = (int) precisionField.get(decimalVector);
Review Comment:
So we don't need to use reflection. Please don't forget my this comment.
##########
fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java:
##########
@@ -46,4 +58,51 @@ public static Optional<Schema> getSchema(LanceConfig config)
{
return Optional.empty();
}
}
+
+ public static long appendFragments(LanceConfig config,
List<FragmentMetadata> fragments) {
+ FragmentOperation.Append appendOp = new
FragmentOperation.Append(fragments);
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
+ Dataset datasetWrite =
+ Dataset.commit(
+ allocator,
+ config.getDatasetUri(),
+ appendOp,
+ java.util.Optional.of(datasetRead.version()),
+ options.getStorageOptions());
+ long version = datasetWrite.version();
+ datasetWrite.close();
+ // Dataset.create returns version 1
+ return version - 1;
+ }
+ }
+
+ public static long commitAppend(
+ LanceConfig config, List<FragmentMetadata> fragments, Map<String,
String> properties) {
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+ Transaction transaction =
+ dataset.newTransactionBuilder()
+
.operation(Append.builder().fragments(fragments).build())
+ .transactionProperties(properties)
+ .build();
+ try (Dataset appendedDataset = transaction.commit()) {
Review Comment:
force to `version -1` look stranges to me. What's the problem if use version
directly?
It'll be error-prone in union read for we have to remember to plus 1 to the
version got from Fluss's zk
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]