This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push: new 9cdc9b7ef [lake/lance] Lance lake writer and committer implementation (#1441) 9cdc9b7ef is described below commit 9cdc9b7ef596996a86b45ec472a71d9dca42e68c Author: xx789 <xx7896...@163.com> AuthorDate: Mon Aug 25 10:51:19 2025 +0800 [lake/lance] Lance lake writer and committer implementation (#1441) --- fluss-lake/fluss-lake-lance/pom.xml | 2 +- .../com/alibaba/fluss/lake/lance/LanceConfig.java | 54 +--- .../alibaba/fluss/lake/lance/LanceLakeCatalog.java | 24 +- .../alibaba/fluss/lake/lance/LanceLakeStorage.java | 3 +- .../fluss/lake/lance/tiering/ArrowWriter.java | 74 +++++ .../fluss/lake/lance/tiering/LanceArrowWriter.java | 130 +++++++++ .../lance/tiering/LanceCommittableSerializer.java | 67 +++++ .../lake/lance/tiering/LanceLakeCommitter.java | 162 +++++++++++ .../lance/tiering/LanceLakeTieringFactory.java | 60 ++++ .../fluss/lake/lance/tiering/LanceLakeWriter.java | 94 ++++++ .../lance/tiering/LanceWriteResultSerializer.java | 55 ++++ .../fluss/lake/lance/utils/LanceArrowUtils.java | 88 ++++++ .../lake/lance/utils/LanceDatasetAdapter.java | 40 +++ .../lake/lance/writers/ArrowBigIntWriter.java | 52 ++++ .../lake/lance/writers/ArrowBinaryWriter.java | 57 ++++ .../lake/lance/writers/ArrowBooleanWriter.java | 54 ++++ .../fluss/lake/lance/writers/ArrowDateWriter.java | 54 ++++ .../lake/lance/writers/ArrowDecimalWriter.java | 70 +++++ .../lake/lance/writers/ArrowDoubleWriter.java | 54 ++++ .../fluss/lake/lance/writers/ArrowFieldWriter.java | 63 ++++ .../fluss/lake/lance/writers/ArrowFloatWriter.java | 53 ++++ .../fluss/lake/lance/writers/ArrowIntWriter.java | 54 ++++ .../lake/lance/writers/ArrowSmallIntWriter.java | 54 ++++ .../fluss/lake/lance/writers/ArrowTimeWriter.java | 90 ++++++ .../lance/writers/ArrowTimestampLtzWriter.java | 99 +++++++ .../lance/writers/ArrowTimestampNtzWriter.java | 97 +++++++ .../lake/lance/writers/ArrowTinyIntWriter.java | 56 ++++ .../lake/lance/writers/ArrowVarBinaryWriter.java | 52 ++++ .../lake/lance/writers/ArrowVarCharWriter.java | 56 ++++ .../src/main/resources/META-INF/NOTICE | 2 +- .../resources/META-INF/licenses/LICENSE.zstd-jni | 26 -- .../lake/lance/LakeEnabledTableCreateITCase.java | 27 +- .../lance/testutils/FlinkLanceTieringTestBase.java | 219 ++++++++++++++ .../lake/lance/tiering/LanceTieringITCase.java | 148 ++++++++++ .../fluss/lake/lance/tiering/LanceTieringTest.java | 318 +++++++++++++++++++++ .../testutils/FlinkPaimonTieringTestBase.java | 1 - 36 files changed, 2497 insertions(+), 112 deletions(-) diff --git a/fluss-lake/fluss-lake-lance/pom.xml b/fluss-lake/fluss-lake-lance/pom.xml index 65715230d..960a3d4db 100644 --- a/fluss-lake/fluss-lake-lance/pom.xml +++ b/fluss-lake/fluss-lake-lance/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <lance.version>0.26.1</lance.version> + <lance.version>0.33.0</lance.version> </properties> <dependencies> diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java index 5d7218d89..240b387e6 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java @@ -32,17 +32,9 @@ public class LanceConfig implements Serializable { private static final String block_size = "block_size"; private static final String version = "version"; - private static final String index_cache_size = "index_cache_size"; - private static final String metadata_cache_size = "metadata_cache_size"; private static final String max_row_per_file = "max_row_per_file"; private static final String max_rows_per_group = "max_rows_per_group"; private static final String max_bytes_per_file = "max_bytes_per_file"; - private static final String ak = "access_key_id"; - private static final String sk = "secret_access_key"; - private static final String endpoint = "aws_endpoint"; - private static final String region = "aws_region"; - private static final String virtual_hosted_style = "virtual_hosted_style_request"; - private static final String allow_http = "allow_http"; private static final String batch_size = "batch_size"; private static final String warehouse = "warehouse"; @@ -64,11 +56,17 @@ public class LanceConfig implements Serializable { } public static LanceConfig from( - Map<String, String> properties, String databaseName, String tableName) { - if (!properties.containsKey(warehouse)) { + Map<String, String> clusterConf, + Map<String, String> tableCustomProperties, + String databaseName, + String tableName) { + if (!clusterConf.containsKey(warehouse)) { throw new IllegalArgumentException("Missing required option " + warehouse); } - return new LanceConfig(databaseName, tableName, properties.get(warehouse), properties); + Map<String, String> options = new HashMap<>(); + options.putAll(clusterConf); + options.putAll(tableCustomProperties); + return new LanceConfig(databaseName, tableName, clusterConf.get(warehouse), options); } public static int getBatchSize(LanceConfig config) { @@ -83,14 +81,6 @@ public class LanceConfig implements Serializable { return options; } - public String getDatabaseName() { - return databaseName; - } - - public String getTableName() { - return tableName; - } - public String getDatasetUri() { return datasetUri; } @@ -104,12 +94,6 @@ public class LanceConfig implements Serializable { if (maps.containsKey(version)) { builder.setVersion(Integer.parseInt(maps.get(version))); } - if (maps.containsKey(index_cache_size)) { - builder.setIndexCacheSize(Integer.parseInt(maps.get(index_cache_size))); - } - if (maps.containsKey(metadata_cache_size)) { - builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size))); - } builder.setStorageOptions(genStorageOptions(config)); return builder.build(); } @@ -130,24 +114,8 @@ public class LanceConfig implements Serializable { return builder.build(); } - private static Map<String, String> genStorageOptions(LanceConfig config) { - Map<String, String> storageOptions = new HashMap<>(); - Map<String, String> maps = config.getOptions(); - if (maps.containsKey(ak) && maps.containsKey(sk) && maps.containsKey(endpoint)) { - storageOptions.put(ak, maps.get(ak)); - storageOptions.put(sk, maps.get(sk)); - storageOptions.put(endpoint, maps.get(endpoint)); - } - if (maps.containsKey(region)) { - storageOptions.put(region, maps.get(region)); - } - if (maps.containsKey(virtual_hosted_style)) { - storageOptions.put(virtual_hosted_style, maps.get(virtual_hosted_style)); - } - if (maps.containsKey(allow_http)) { - storageOptions.put(allow_http, maps.get(allow_http)); - } - return storageOptions; + public static Map<String, String> genStorageOptions(LanceConfig config) { + return config.getOptions(); } @Override diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java index 827c9582d..bb2df9389 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java @@ -26,31 +26,14 @@ import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; import com.lancedb.lance.WriteParams; -import org.apache.arrow.vector.types.TimeUnit; -import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import java.util.ArrayList; import java.util.List; -import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; -import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; -import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; - /** A Lance implementation of {@link LakeCatalog}. */ public class LanceLakeCatalog implements LakeCatalog { - private static final List<Field> SYSTEM_COLUMNS = new ArrayList<>(); - - static { - SYSTEM_COLUMNS.add(Field.nullable(BUCKET_COLUMN_NAME, new ArrowType.Int(32, true))); - SYSTEM_COLUMNS.add(Field.nullable(OFFSET_COLUMN_NAME, new ArrowType.Int(64, true))); - SYSTEM_COLUMNS.add( - Field.nullable( - TIMESTAMP_COLUMN_NAME, - new ArrowType.Timestamp(TimeUnit.MICROSECOND, null))); - } - private final Configuration options; public LanceLakeCatalog(Configuration config) { @@ -67,7 +50,10 @@ public class LanceLakeCatalog implements LakeCatalog { LanceConfig config = LanceConfig.from( - options.toMap(), tablePath.getDatabaseName(), tablePath.getTableName()); + options.toMap(), + tableDescriptor.getCustomProperties(), + tablePath.getDatabaseName(), + tablePath.getTableName()); WriteParams params = LanceConfig.genWriteParamsFromConfig(config); List<Field> fields = new ArrayList<>(); @@ -75,8 +61,6 @@ public class LanceLakeCatalog implements LakeCatalog { fields.addAll( LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType()) .getFields()); - // add system metadata columns to schema - fields.addAll(SYSTEM_COLUMNS); try { LanceDatasetAdapter.createDataset(config.getDatasetUri(), new Schema(fields), params); } catch (RuntimeException e) { diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java index 5864ccf77..c4b51cd9c 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java @@ -20,6 +20,7 @@ package com.alibaba.fluss.lake.lance; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.lake.lakestorage.LakeStorage; import com.alibaba.fluss.lake.lance.tiering.LanceCommittable; +import com.alibaba.fluss.lake.lance.tiering.LanceLakeTieringFactory; import com.alibaba.fluss.lake.lance.tiering.LanceWriteResult; import com.alibaba.fluss.lake.source.LakeSource; import com.alibaba.fluss.lake.writer.LakeTieringFactory; @@ -35,7 +36,7 @@ public class LanceLakeStorage implements LakeStorage { @Override public LakeTieringFactory<LanceWriteResult, LanceCommittable> createLakeTieringFactory() { - throw new UnsupportedOperationException("Not implemented"); + return new LanceLakeTieringFactory(config); } @Override diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java new file mode 100644 index 000000000..053b348b3 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils; +import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.RowType; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; + +/** An Arrow writer for {@link InternalRow}. */ +public class ArrowWriter { + private final VectorSchemaRoot root; + + private final ArrowFieldWriter<InternalRow>[] fieldWriters; + + private int recordsCount; + + /** + * Writer which serializes the Fluss rows to Arrow record batches. + * + * @param fieldWriters An array of writers which are responsible for the serialization of each + * column of the rows + * @param root Container that holds a set of vectors for the rows + */ + public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, VectorSchemaRoot root) { + this.fieldWriters = fieldWriters; + this.root = root; + } + + public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) { + ArrowFieldWriter<InternalRow>[] fieldWriters = + new ArrowFieldWriter[root.getFieldVectors().size()]; + for (int i = 0; i < fieldWriters.length; i++) { + FieldVector fieldVector = root.getVector(i); + + fieldWriters[i] = + LanceArrowUtils.createArrowFieldWriter(fieldVector, rowType.getTypeAt(i)); + } + return new ArrowWriter(fieldWriters, root); + } + + /** Writes the specified row which is serialized into Arrow format. */ + public void writeRow(InternalRow row) { + for (int i = 0; i < fieldWriters.length; i++) { + fieldWriters[i].write(row, i, true); + } + recordsCount++; + } + + public void finish() { + root.setRowCount(recordsCount); + for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) { + fieldWriter.finish(); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java new file mode 100644 index 000000000..e2d0f4944 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static com.alibaba.fluss.utils.Preconditions.checkArgument; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** A custom arrow reader that supports writes Fluss internal rows while reading data in batches. */ +public class LanceArrowWriter extends ArrowReader { + private final Schema schema; + private final RowType rowType; + private final int batchSize; + + private volatile boolean finished; + + private final AtomicLong totalBytesRead = new AtomicLong(); + private ArrowWriter arrowWriter = null; + private final AtomicInteger count = new AtomicInteger(0); + private final Semaphore writeToken; + private final Semaphore loadToken; + + public LanceArrowWriter( + BufferAllocator allocator, Schema schema, int batchSize, RowType rowType) { + super(allocator); + checkNotNull(schema); + checkArgument(batchSize > 0); + this.schema = schema; + this.rowType = rowType; + this.batchSize = batchSize; + this.writeToken = new Semaphore(0); + this.loadToken = new Semaphore(0); + } + + void write(LogRecord row) { + checkNotNull(row); + try { + // wait util prepareLoadNextBatch to release write token, + writeToken.acquire(); + arrowWriter.writeRow(row.getRow()); + if (count.incrementAndGet() == batchSize) { + // notify loadNextBatch to take the batch + loadToken.release(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + void setFinished() { + loadToken.release(); + finished = true; + } + + @Override + public void prepareLoadNextBatch() throws IOException { + super.prepareLoadNextBatch(); + arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType); + // release batch size token for write + writeToken.release(batchSize); + } + + @Override + public boolean loadNextBatch() throws IOException { + prepareLoadNextBatch(); + try { + if (finished && count.get() == 0) { + return false; + } + // wait util batch if full or finished + loadToken.acquire(); + arrowWriter.finish(); + if (!finished) { + count.set(0); + return true; + } else { + // true if it has some rows and return false if there is no record + if (count.get() > 0) { + count.set(0); + return true; + } else { + return false; + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public long bytesRead() { + throw new UnsupportedOperationException(); + } + + @Override + protected synchronized void closeReadSource() throws IOException { + // Implement if needed + } + + @Override + protected Schema readSchema() { + return this.schema; + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java new file mode 100644 index 000000000..abe7efc83 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; + +import com.lancedb.lance.FragmentMetadata; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; + +/** The serializer of {@link LanceCommittable}. */ +public class LanceCommittableSerializer implements SimpleVersionedSerializer<LanceCommittable> { + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(LanceCommittable lanceCommittable) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(lanceCommittable.committable()); + return baos.toByteArray(); + } + } + + @Override + public LanceCommittable deserialize(int version, byte[] serialized) throws IOException { + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting LanceCommittable version to be " + + CURRENT_VERSION + + ", but found " + + version + + "."); + } + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + ObjectInputStream ois = new ObjectInputStream(bais)) { + //noinspection unchecked + return new LanceCommittable((List<FragmentMetadata>) ois.readObject()); + } catch (ClassNotFoundException e) { + throw new IOException("Couldn't deserialize LanceCommittable", e); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java new file mode 100644 index 000000000..3eae2c792 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.committer.BucketOffset; +import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot; +import com.alibaba.fluss.lake.committer.LakeCommitter; +import com.alibaba.fluss.lake.lance.LanceConfig; +import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde; +import com.alibaba.fluss.utils.types.Tuple2; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.FragmentMetadata; +import com.lancedb.lance.ReadOptions; +import com.lancedb.lance.Transaction; +import com.lancedb.lance.Version; +import org.apache.arrow.memory.RootAllocator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; + +/** Implementation of {@link LakeCommitter} for Lance. */ +public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, LanceCommittable> { + private final LanceConfig config; + private static final String committerName = "commit-user"; + private final RootAllocator allocator = new RootAllocator(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public LanceLakeCommitter(Configuration options, TablePath tablePath) { + this.config = + LanceConfig.from( + options.toMap(), + Collections.emptyMap(), + tablePath.getDatabaseName(), + tablePath.getTableName()); + } + + @Override + public LanceCommittable toCommittable(List<LanceWriteResult> lanceWriteResults) + throws IOException { + List<FragmentMetadata> fragments = + lanceWriteResults.stream() + .map(LanceWriteResult::commitMessage) + .flatMap(List::stream) + .collect(Collectors.toList()); + return new LanceCommittable(fragments); + } + + @Override + public long commit(LanceCommittable committable, Map<String, String> snapshotProperties) + throws IOException { + Map<String, String> properties = new HashMap<>(snapshotProperties); + properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER); + return LanceDatasetAdapter.commitAppend(config, committable.committable(), properties); + } + + @Override + public void abort(LanceCommittable committable) throws IOException { + // TODO lance does not have the API to proactively delete the written files yet, see + // https://github.com/lancedb/lance/issues/4508 + } + + @SuppressWarnings("checkstyle:LocalVariableName") + @Nullable + @Override + public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss) + throws IOException { + Tuple2<Version, Transaction> latestLakeSnapshotIdOfLake = + getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER); + + if (latestLakeSnapshotIdOfLake == null) { + return null; + } + + // we get the latest snapshot committed by fluss, + // but the latest snapshot is not greater than latestLakeSnapshotIdOfFluss, no any missing + // snapshot, return directly + if (latestLakeSnapshotIdOfFluss != null + && latestLakeSnapshotIdOfLake.f0.getId() <= latestLakeSnapshotIdOfFluss) { + return null; + } + + CommittedLakeSnapshot committedLakeSnapshot = + new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.f0.getId()); + String flussOffsetProperties = + latestLakeSnapshotIdOfLake + .f1 + .transactionProperties() + .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); + for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) { + BucketOffset bucketOffset = BucketOffsetJsonSerde.INSTANCE.deserialize(node); + if (bucketOffset.getPartitionId() != null) { + committedLakeSnapshot.addPartitionBucket( + bucketOffset.getPartitionId(), + bucketOffset.getPartitionQualifiedName(), + bucketOffset.getBucket(), + bucketOffset.getLogOffset()); + } else { + committedLakeSnapshot.addBucket( + bucketOffset.getBucket(), bucketOffset.getLogOffset()); + } + } + return committedLakeSnapshot; + } + + @Nullable + private Tuple2<Version, Transaction> getCommittedLatestSnapshotOfLake(String commitUser) { + ReadOptions.Builder builder = new ReadOptions.Builder(); + builder.setStorageOptions(LanceConfig.genStorageOptions(config)); + try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) { + List<Version> versions = dataset.listVersions(); + for (int i = versions.size() - 1; i >= 0; i--) { + Version version = versions.get(i); + builder.setVersion((int) version.getId()); + try (Dataset datasetRead = + Dataset.open(allocator, config.getDatasetUri(), builder.build())) { + Transaction transaction = datasetRead.readTransaction().orElse(null); + if (transaction != null + && commitUser.equals( + transaction.transactionProperties().get(committerName))) { + return Tuple2.of(version, transaction); + } + } + } + } + return null; + } + + @Override + public void close() throws Exception { + allocator.close(); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java new file mode 100644 index 000000000..1bd0894b2 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.committer.CommitterInitContext; +import com.alibaba.fluss.lake.committer.LakeCommitter; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.writer.LakeTieringFactory; +import com.alibaba.fluss.lake.writer.LakeWriter; +import com.alibaba.fluss.lake.writer.WriterInitContext; + +import java.io.IOException; + +/** Implementation of {@link LakeTieringFactory} for Lance . */ +public class LanceLakeTieringFactory + implements LakeTieringFactory<LanceWriteResult, LanceCommittable> { + private final Configuration config; + + public LanceLakeTieringFactory(Configuration config) { + this.config = config; + } + + @Override + public LakeWriter<LanceWriteResult> createLakeWriter(WriterInitContext writerInitContext) + throws IOException { + return new LanceLakeWriter(config, writerInitContext); + } + + @Override + public SimpleVersionedSerializer<LanceWriteResult> getWriteResultSerializer() { + return new LanceWriteResultSerializer(); + } + + @Override + public LakeCommitter<LanceWriteResult, LanceCommittable> createLakeCommitter( + CommitterInitContext committerInitContext) throws IOException { + return new LanceLakeCommitter(config, committerInitContext.tablePath()); + } + + @Override + public SimpleVersionedSerializer<LanceCommittable> getCommittableSerializer() { + return new LanceCommittableSerializer(); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java new file mode 100644 index 000000000..73c0fdfdb --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.lance.LanceConfig; +import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter; +import com.alibaba.fluss.lake.writer.LakeWriter; +import com.alibaba.fluss.lake.writer.WriterInitContext; +import com.alibaba.fluss.record.LogRecord; + +import com.lancedb.lance.FragmentMetadata; +import com.lancedb.lance.WriteParams; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +/** Implementation of {@link LakeWriter} for Lance. */ +public class LanceLakeWriter implements LakeWriter<LanceWriteResult> { + private final LanceArrowWriter arrowWriter; + private final FutureTask<List<FragmentMetadata>> fragmentCreationTask; + + public LanceLakeWriter(Configuration options, WriterInitContext writerInitContext) + throws IOException { + LanceConfig config = + LanceConfig.from( + options.toMap(), + writerInitContext.customProperties(), + writerInitContext.tablePath().getDatabaseName(), + writerInitContext.tablePath().getTableName()); + int batchSize = LanceConfig.getBatchSize(config); + Optional<Schema> schema = LanceDatasetAdapter.getSchema(config); + if (!schema.isPresent()) { + throw new IOException("Fail to get dataset " + config.getDatasetUri() + " in Lance."); + } + + this.arrowWriter = + LanceDatasetAdapter.getArrowWriter( + schema.get(), batchSize, writerInitContext.schema().getRowType()); + + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + Callable<List<FragmentMetadata>> fragmentCreator = + () -> + LanceDatasetAdapter.createFragment( + config.getDatasetUri(), arrowWriter, params); + fragmentCreationTask = new FutureTask<>(fragmentCreator); + Thread fragmentCreationThread = new Thread(fragmentCreationTask); + fragmentCreationThread.start(); + } + + @Override + public void write(LogRecord record) throws IOException { + arrowWriter.write(record); + } + + @Override + public LanceWriteResult complete() throws IOException { + arrowWriter.setFinished(); + try { + List<FragmentMetadata> fragmentMetadata = fragmentCreationTask.get(); + return new LanceWriteResult(fragmentMetadata); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for reader thread to finish", e); + } catch (ExecutionException e) { + throw new IOException("Exception in reader thread", e); + } + } + + @Override + public void close() throws IOException { + arrowWriter.close(); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java new file mode 100644 index 000000000..81c1b8450 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** The {@link SimpleVersionedSerializer} for {@link LanceWriteResult}. */ +public class LanceWriteResultSerializer implements SimpleVersionedSerializer<LanceWriteResult> { + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(LanceWriteResult lanceWriteResult) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); ) { + oos.writeObject(lanceWriteResult); + return baos.toByteArray(); + } + } + + @Override + public LanceWriteResult deserialize(int version, byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return (LanceWriteResult) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Couldn't deserialize LanceWriteResult", e); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java index cf92952c9..4de499ffe 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java @@ -17,6 +17,23 @@ package com.alibaba.fluss.lake.lance.utils; +import com.alibaba.fluss.lake.lance.writers.ArrowBigIntWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowBinaryWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowBooleanWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowDateWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowDecimalWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowDoubleWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowFloatWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowIntWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowSmallIntWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowTimeWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowTimestampLtzWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowTimestampNtzWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowTinyIntWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowVarBinaryWriter; +import com.alibaba.fluss.lake.lance.writers.ArrowVarCharWriter; +import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.BigIntType; import com.alibaba.fluss.types.BinaryType; import com.alibaba.fluss.types.BooleanType; @@ -37,6 +54,24 @@ import com.alibaba.fluss.types.TimeType; import com.alibaba.fluss.types.TimestampType; import com.alibaba.fluss.types.TinyIntType; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -187,4 +222,57 @@ public class LanceArrowUtils { "Unsupported data type %s currently.", dataType.asSummaryString())); } } + + private static int getPrecision(DecimalVector decimalVector) { + return decimalVector.getPrecision(); + } + + public static ArrowFieldWriter<InternalRow> createArrowFieldWriter( + ValueVector vector, DataType dataType) { + if (vector instanceof TinyIntVector) { + return ArrowTinyIntWriter.forField((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + return ArrowSmallIntWriter.forField((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + return ArrowIntWriter.forField((IntVector) vector); + } else if (vector instanceof BigIntVector) { + return ArrowBigIntWriter.forField((BigIntVector) vector); + } else if (vector instanceof BitVector) { + return ArrowBooleanWriter.forField((BitVector) vector); + } else if (vector instanceof Float4Vector) { + return ArrowFloatWriter.forField((Float4Vector) vector); + } else if (vector instanceof Float8Vector) { + return ArrowDoubleWriter.forField((Float8Vector) vector); + } else if (vector instanceof VarCharVector) { + return ArrowVarCharWriter.forField((VarCharVector) vector); + } else if (vector instanceof FixedSizeBinaryVector) { + return ArrowBinaryWriter.forField((FixedSizeBinaryVector) vector); + } else if (vector instanceof VarBinaryVector) { + return ArrowVarBinaryWriter.forField((VarBinaryVector) vector); + } else if (vector instanceof DecimalVector) { + DecimalVector decimalVector = (DecimalVector) vector; + return ArrowDecimalWriter.forField( + decimalVector, getPrecision(decimalVector), decimalVector.getScale()); + } else if (vector instanceof DateDayVector) { + return ArrowDateWriter.forField((DateDayVector) vector); + } else if (vector instanceof TimeSecVector + || vector instanceof TimeMilliVector + || vector instanceof TimeMicroVector + || vector instanceof TimeNanoVector) { + return ArrowTimeWriter.forField(vector); + } else if (vector instanceof TimeStampVector + && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) { + int precision; + if (dataType instanceof LocalZonedTimestampType) { + precision = ((LocalZonedTimestampType) dataType).getPrecision(); + return ArrowTimestampLtzWriter.forField(vector, precision); + } else { + precision = ((TimestampType) dataType).getPrecision(); + return ArrowTimestampNtzWriter.forField(vector, precision); + } + } else { + throw new UnsupportedOperationException( + String.format("Unsupported type %s.", dataType)); + } + } } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java index ed50abcc9..54ddefbef 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java @@ -18,14 +18,25 @@ package com.alibaba.fluss.lake.lance.utils; import com.alibaba.fluss.lake.lance.LanceConfig; +import com.alibaba.fluss.lake.lance.tiering.LanceArrowWriter; +import com.alibaba.fluss.types.RowType; import com.lancedb.lance.Dataset; +import com.lancedb.lance.Fragment; +import com.lancedb.lance.FragmentMetadata; import com.lancedb.lance.ReadOptions; +import com.lancedb.lance.Transaction; import com.lancedb.lance.WriteParams; +import com.lancedb.lance.operation.Append; +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.Schema; +import java.util.List; +import java.util.Map; import java.util.Optional; /** Lance dataset API adapter. */ @@ -46,4 +57,33 @@ public class LanceDatasetAdapter { return Optional.empty(); } } + + public static long commitAppend( + LanceConfig config, List<FragmentMetadata> fragments, Map<String, String> properties) { + String uri = config.getDatasetUri(); + ReadOptions options = LanceConfig.genReadOptionFromConfig(config); + try (Dataset dataset = Dataset.open(allocator, uri, options)) { + Transaction transaction = + dataset.newTransactionBuilder() + .operation(Append.builder().fragments(fragments).build()) + .transactionProperties(properties) + .build(); + try (Dataset appendedDataset = transaction.commit()) { + // note: lance dataset version starts from 1 + return appendedDataset.version(); + } + } + } + + public static LanceArrowWriter getArrowWriter(Schema schema, int batchSize, RowType rowType) { + return new LanceArrowWriter(allocator, schema, batchSize, rowType); + } + + public static List<FragmentMetadata> createFragment( + String datasetUri, ArrowReader reader, WriteParams params) { + try (ArrowArrayStream arrowStream = ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader, arrowStream); + return Fragment.create(datasetUri, arrowStream, params); + } + } } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java new file mode 100644 index 000000000..55026de5b --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.BigIntVector; + +/** {@link ArrowFieldWriter} for BigInt. */ +public class ArrowBigIntWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowBigIntWriter forField(BigIntVector bigIntVector) { + return new ArrowBigIntWriter(bigIntVector); + } + + private ArrowBigIntWriter(BigIntVector bigIntVector) { + super(bigIntVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + BigIntVector vector = (BigIntVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readLong(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private long readLong(InternalRow row, int ordinal) { + return row.getLong(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java new file mode 100644 index 000000000..b33fd83f4 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.FixedSizeBinaryVector; + +/** {@link ArrowFieldWriter} for Binary. */ +public class ArrowBinaryWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowBinaryWriter forField(FixedSizeBinaryVector binaryVector) { + return new ArrowBinaryWriter(binaryVector); + } + + private final int byteWidth; + + private ArrowBinaryWriter(FixedSizeBinaryVector binaryVector) { + super(binaryVector); + this.byteWidth = binaryVector.getByteWidth(); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + FixedSizeBinaryVector vector = (FixedSizeBinaryVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readBinary(row, ordinal)); + } else { + vector.set(getCount(), readBinary(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private byte[] readBinary(InternalRow row, int ordinal) { + return row.getBinary(ordinal, byteWidth); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java new file mode 100644 index 000000000..80b8e79d8 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.BitVector; + +/** {@link ArrowFieldWriter} for Boolean. */ +public class ArrowBooleanWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowBooleanWriter forField(BitVector booleanVector) { + return new ArrowBooleanWriter(booleanVector); + } + + private ArrowBooleanWriter(BitVector bitVector) { + super(bitVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + BitVector vector = (BitVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readBoolean(row, ordinal) ? 1 : 0); + } else { + vector.set(getCount(), readBoolean(row, ordinal) ? 1 : 0); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private boolean readBoolean(InternalRow row, int ordinal) { + return row.getBoolean(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java new file mode 100644 index 000000000..141df39a8 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.DateDayVector; + +/** {@link ArrowFieldWriter} for Date. */ +public class ArrowDateWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowDateWriter forField(DateDayVector dateDayVector) { + return new ArrowDateWriter(dateDayVector); + } + + private ArrowDateWriter(DateDayVector dateDayVector) { + super(dateDayVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + DateDayVector vector = (DateDayVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readDate(row, ordinal)); + } else { + vector.set(getCount(), readDate(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private int readDate(InternalRow row, int ordinal) { + return row.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java new file mode 100644 index 000000000..f4e2e4ab9 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.DecimalVector; + +import java.math.BigDecimal; + +/** {@link ArrowFieldWriter} for Decimal. */ +public class ArrowDecimalWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowDecimalWriter forField( + DecimalVector decimalVector, int precision, int scale) { + return new ArrowDecimalWriter(decimalVector, precision, scale); + } + + private final int precision; + private final int scale; + + private ArrowDecimalWriter(DecimalVector decimalVector, int precision, int scale) { + super(decimalVector); + this.precision = precision; + this.scale = scale; + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + DecimalVector vector = (DecimalVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + BigDecimal bigDecimal = readDecimal(row, ordinal).toBigDecimal(); + if (bigDecimal == null) { + vector.setNull(getCount()); + } else { + if (handleSafe) { + vector.setSafe(getCount(), bigDecimal); + } else { + vector.set(getCount(), bigDecimal); + } + } + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private Decimal readDecimal(InternalRow row, int ordinal) { + return row.getDecimal(ordinal, precision, scale); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java new file mode 100644 index 000000000..f3d54bdf3 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.Float8Vector; + +/** {@link ArrowFieldWriter} for Double. */ +public class ArrowDoubleWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowDoubleWriter forField(Float8Vector doubleVector) { + return new ArrowDoubleWriter(doubleVector); + } + + private ArrowDoubleWriter(Float8Vector doubleVector) { + super(doubleVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + Float8Vector vector = (Float8Vector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readDouble(row, ordinal)); + } else { + vector.set(getCount(), readDouble(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private double readDouble(InternalRow row, int ordinal) { + return row.getDouble(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java new file mode 100644 index 000000000..0278d2b29 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import org.apache.arrow.vector.ValueVector; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** + * Base class for arrow field writer which is used to convert a field to an Arrow format. + * + * @param <IN> Type of the input to write. Currently, it's always InternalRow, may support + * InternalArray in the future. + */ +public abstract class ArrowFieldWriter<IN> { + + /** Container which is used to store the written sequence of values of a column. */ + private final ValueVector valueVector; + + private int count; + + public ArrowFieldWriter(ValueVector valueVector) { + this.valueVector = checkNotNull(valueVector); + } + + /** Returns the underlying container which stores the sequence of values of a column. */ + public ValueVector getValueVector() { + return valueVector; + } + + /** Returns the current count of elements written. */ + public int getCount() { + return count; + } + + /** Sets the field value as the field at the specified ordinal of the specified row. */ + public abstract void doWrite(IN row, int ordinal, boolean handleSafe); + + /** Writes the specified ordinal of the specified row. */ + public void write(IN row, int ordinal, boolean handleSafe) { + doWrite(row, ordinal, handleSafe); + count++; + } + + public void finish() { + valueVector.setValueCount(count); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java new file mode 100644 index 000000000..e971e7735 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.Float4Vector; + +/** {@link ArrowFieldWriter} for Float. */ +public class ArrowFloatWriter extends ArrowFieldWriter<InternalRow> { + public static ArrowFloatWriter forField(Float4Vector float4Vector) { + return new ArrowFloatWriter(float4Vector); + } + + private ArrowFloatWriter(Float4Vector float4Vector) { + super(float4Vector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + Float4Vector vector = (Float4Vector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readFloat(row, ordinal)); + } else { + vector.set(getCount(), readFloat(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private float readFloat(InternalRow row, int ordinal) { + return row.getFloat(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java new file mode 100644 index 000000000..06f397b8e --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.IntVector; + +/** {@link ArrowFieldWriter} for Int. */ +public class ArrowIntWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowIntWriter forField(IntVector intVector) { + return new ArrowIntWriter(intVector); + } + + private ArrowIntWriter(IntVector intVector) { + super(intVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + IntVector vector = (IntVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readInt(row, ordinal)); + } else { + vector.set(getCount(), readInt(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + int readInt(InternalRow row, int ordinal) { + return row.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java new file mode 100644 index 000000000..3b410ed6a --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.SmallIntVector; + +/** {@link ArrowFieldWriter} for SmallInt. */ +public class ArrowSmallIntWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowSmallIntWriter forField(SmallIntVector smallIntVector) { + return new ArrowSmallIntWriter(smallIntVector); + } + + private ArrowSmallIntWriter(SmallIntVector smallIntVector) { + super(smallIntVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + SmallIntVector vector = (SmallIntVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else if (handleSafe) { + vector.setSafe(getCount(), readShort(row, ordinal)); + } else { + vector.set(getCount(), readShort(row, ordinal)); + } + } + + public boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + public short readShort(InternalRow row, int ordinal) { + return row.getShort(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java new file mode 100644 index 000000000..fcfe9fecb --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.ValueVector; + +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for Time. */ +public class ArrowTimeWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowTimeWriter forField(ValueVector valueVector) { + return new ArrowTimeWriter(valueVector); + } + + private ArrowTimeWriter(ValueVector valueVector) { + super(valueVector); + checkState( + valueVector instanceof TimeSecVector + || valueVector instanceof TimeMilliVector + || valueVector instanceof TimeMicroVector + || valueVector instanceof TimeNanoVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + ValueVector valueVector = getValueVector(); + if (isNullAt(row, ordinal)) { + ((BaseFixedWidthVector) valueVector).setNull(getCount()); + } else if (valueVector instanceof TimeSecVector) { + int sec = readTime(row, ordinal) / 1000; + if (handleSafe) { + ((TimeSecVector) valueVector).setSafe(getCount(), sec); + } else { + ((TimeSecVector) valueVector).set(getCount(), sec); + } + } else if (valueVector instanceof TimeMilliVector) { + int ms = readTime(row, ordinal); + if (handleSafe) { + ((TimeMilliVector) valueVector).setSafe(getCount(), ms); + } else { + ((TimeMilliVector) valueVector).set(getCount(), ms); + } + } else if (valueVector instanceof TimeMicroVector) { + long microSec = readTime(row, ordinal) * 1000L; + if (handleSafe) { + ((TimeMicroVector) valueVector).setSafe(getCount(), microSec); + } else { + ((TimeMicroVector) valueVector).set(getCount(), microSec); + } + } else { + long nanoSec = readTime(row, ordinal) * 1000000L; + if (handleSafe) { + ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec); + } else { + ((TimeNanoVector) valueVector).set(getCount(), nanoSec); + } + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private int readTime(InternalRow row, int ordinal) { + return row.getInt(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java new file mode 100644 index 000000000..3ea10d399 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; + +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for TimestampLtz. */ +public class ArrowTimestampLtzWriter extends ArrowFieldWriter<InternalRow> { + public static ArrowTimestampLtzWriter forField(ValueVector valueVector, int precision) { + return new ArrowTimestampLtzWriter(valueVector, precision); + } + + private final int precision; + + private ArrowTimestampLtzWriter(ValueVector valueVector, int precision) { + super(valueVector); + checkState( + valueVector instanceof TimeStampVector + && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone() + == null); + this.precision = precision; + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + TimeStampVector vector = (TimeStampVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + TimestampLtz timestamp = readTimestamp(row, ordinal); + if (vector instanceof TimeStampSecVector) { + long sec = timestamp.getEpochMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), sec); + } else { + vector.set(getCount(), sec); + } + } else if (vector instanceof TimeStampMilliVector) { + long ms = timestamp.getEpochMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), ms); + } else { + vector.set(getCount(), ms); + } + } else if (vector instanceof TimeStampMicroVector) { + long microSec = + timestamp.getEpochMillisecond() * 1000 + + timestamp.getNanoOfMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), microSec); + } else { + vector.set(getCount(), microSec); + } + } else { + long nanoSec = + timestamp.getEpochMillisecond() * 1_000_000 + + timestamp.getNanoOfMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), nanoSec); + } else { + vector.set(getCount(), nanoSec); + } + } + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private TimestampLtz readTimestamp(InternalRow row, int ordinal) { + return row.getTimestampLtz(ordinal, precision); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java new file mode 100644 index 000000000..3d129be98 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampNtz; + +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** {@link ArrowFieldWriter} for TimestampNtz. */ +public class ArrowTimestampNtzWriter extends ArrowFieldWriter<InternalRow> { + public static ArrowTimestampNtzWriter forField(ValueVector valueVector, int precision) { + return new ArrowTimestampNtzWriter(valueVector, precision); + } + + private final int precision; + + private ArrowTimestampNtzWriter(ValueVector valueVector, int precision) { + super(valueVector); + checkState( + valueVector instanceof TimeStampVector + && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone() + == null); + this.precision = precision; + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + TimeStampVector vector = (TimeStampVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + TimestampNtz timestamp = readTimestamp(row, ordinal); + if (vector instanceof TimeStampSecVector) { + long sec = timestamp.getMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), sec); + } else { + vector.set(getCount(), sec); + } + } else if (vector instanceof TimeStampMilliVector) { + long ms = timestamp.getMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), ms); + } else { + vector.set(getCount(), ms); + } + } else if (vector instanceof TimeStampMicroVector) { + long microSec = + timestamp.getMillisecond() * 1000 + timestamp.getNanoOfMillisecond() / 1000; + if (handleSafe) { + vector.setSafe(getCount(), microSec); + } else { + vector.set(getCount(), microSec); + } + } else { + long nanoSec = + timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); + if (handleSafe) { + vector.setSafe(getCount(), nanoSec); + } else { + vector.set(getCount(), nanoSec); + } + } + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private TimestampNtz readTimestamp(InternalRow row, int ordinal) { + return row.getTimestampNtz(ordinal, precision); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java new file mode 100644 index 000000000..805d24afe --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.TinyIntVector; + +/** {@link ArrowFieldWriter} for TinyInt. */ +public class ArrowTinyIntWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowTinyIntWriter forField(TinyIntVector tinyIntVector) { + return new ArrowTinyIntWriter(tinyIntVector); + } + + private ArrowTinyIntWriter(TinyIntVector tinyIntVector) { + super(tinyIntVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + TinyIntVector vector = (TinyIntVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + if (handleSafe) { + vector.setSafe(getCount(), readByte(row, ordinal)); + } else { + vector.set(getCount(), readByte(row, ordinal)); + } + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private byte readByte(InternalRow row, int ordinal) { + return row.getByte(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java new file mode 100644 index 000000000..d3be0a653 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.VarBinaryVector; + +/** {@link ArrowFieldWriter} for VarBinary. */ +public class ArrowVarBinaryWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowVarBinaryWriter forField(VarBinaryVector varBinaryVector) { + return new ArrowVarBinaryWriter(varBinaryVector); + } + + private ArrowVarBinaryWriter(VarBinaryVector varBinaryVector) { + super(varBinaryVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + VarBinaryVector vector = (VarBinaryVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + vector.setSafe(getCount(), readBinary(row, ordinal)); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private byte[] readBinary(InternalRow row, int ordinal) { + return row.getBytes(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java new file mode 100644 index 000000000..05b2d442c --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.writers; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.InternalRow; + +import org.apache.arrow.vector.VarCharVector; + +import java.nio.ByteBuffer; + +/** {@link ArrowFieldWriter} for VarChar. */ +public class ArrowVarCharWriter extends ArrowFieldWriter<InternalRow> { + + public static ArrowVarCharWriter forField(VarCharVector varCharVector) { + return new ArrowVarCharWriter(varCharVector); + } + + private ArrowVarCharWriter(VarCharVector varCharVector) { + super(varCharVector); + } + + @Override + public void doWrite(InternalRow row, int ordinal, boolean handleSafe) { + VarCharVector vector = (VarCharVector) getValueVector(); + if (isNullAt(row, ordinal)) { + vector.setNull(getCount()); + } else { + ByteBuffer buffer = readString(row, ordinal).wrapByteBuffer(); + vector.setSafe(getCount(), buffer, buffer.position(), buffer.remaining()); + } + } + + private boolean isNullAt(InternalRow row, int ordinal) { + return row.isNullAt(ordinal); + } + + private BinaryString readString(InternalRow row, int ordinal) { + return row.getString(ordinal); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE index 786813d32..36053bfbc 100644 --- a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE +++ b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE @@ -21,7 +21,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.arrow:arrow-memory-netty:15.0.0 - org.apache.arrow:arrow-vector:15.0.0 - org.questdb:jar-jni:1.1.1 -- com.lancedb:lance-core:0.26.1 +- com.lancedb:lance-core:0.33.0 - commons-codec:commons-codec:1.4 - com.google.flatbuffers:flatbuffers-java:23.5.26 diff --git a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni deleted file mode 100644 index 66abb8ae7..000000000 --- a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni +++ /dev/null @@ -1,26 +0,0 @@ -Zstd-jni: JNI bindings to Zstd Library - -Copyright (c) 2015-present, Luben Karavelov/ All rights reserved. - -BSD License - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, this - list of conditions and the following disclaimer in the documentation and/or - other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java index 1b071e867..2be42fff4 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java @@ -45,10 +45,9 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.nio.file.Files; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; -import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; -import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; -import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -111,6 +110,8 @@ class LakeEnabledTableCreateITCase { @Test void testLogTable() throws Exception { + Map<String, String> customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); // test bucket key log table TableDescriptor logTable = TableDescriptor.builder() @@ -134,11 +135,13 @@ class LakeEnabledTableCreateITCase { .column("log_c16", DataTypes.TIMESTAMP()) .build()) .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) .distributedBy(BUCKET_NUM, "log_c1", "log_c2") .build(); TablePath logTablePath = TablePath.of(DATABASE, "log_table"); admin.createTable(logTablePath, logTable, false).get(); - LanceConfig config = LanceConfig.from(lanceConf.toMap(), DATABASE, "log_table"); + LanceConfig config = + LanceConfig.from(lanceConf.toMap(), customProperties, DATABASE, "log_table"); // check the gotten log table Field logC1 = new Field("log_c1", FieldType.nullable(new ArrowType.Int(4 * 8, true)), null); @@ -182,25 +185,11 @@ class LakeEnabledTableCreateITCase { FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), null); - // for __bucket, __offset, __timestamp - Field logC17 = - new Field( - BUCKET_COLUMN_NAME, FieldType.nullable(new ArrowType.Int(32, true)), null); - Field logC18 = - new Field( - OFFSET_COLUMN_NAME, FieldType.nullable(new ArrowType.Int(64, true)), null); - Field logC19 = - new Field( - TIMESTAMP_COLUMN_NAME, - FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), - null); - org.apache.arrow.vector.types.pojo.Schema expectedSchema = new org.apache.arrow.vector.types.pojo.Schema( Arrays.asList( logC1, logC2, logC3, logC4, logC5, logC6, logC7, logC8, logC9, - logC10, logC11, logC12, logC13, logC14, logC15, logC16, logC17, - logC18, logC19)); + logC10, logC11, logC12, logC13, logC14, logC15, logC16)); assertThat(expectedSchema).isEqualTo(LanceDatasetAdapter.getSchema(config).get()); } diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java new file mode 100644 index 000000000..728e4a8a9 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.testutils; + +import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.ConnectionFactory; +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.client.table.writer.AppendWriter; +import com.alibaba.fluss.client.table.writer.TableWriter; +import com.alibaba.fluss.client.table.writer.UpsertWriter; +import com.alibaba.fluss.config.AutoPartitionTimeUnit; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder; +import com.alibaba.fluss.metadata.DataLakeFormat; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.server.replica.Replica; +import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import com.alibaba.fluss.types.DataTypes; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.nio.file.Files; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; +import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test base for sync to lance by Flink. */ +public class FlinkLanceTieringTestBase { + + protected static final String DEFAULT_DB = "fluss"; + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf(initConfig()) + .setNumOfTabletServers(3) + .build(); + + protected StreamExecutionEnvironment execEnv; + + protected static Connection conn; + protected static Admin admin; + protected static Configuration clientConf; + private static String warehousePath; + + private static Configuration initConfig() { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + // not to clean snapshots for test purpose + .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE); + conf.setString("datalake.format", "lance"); + try { + warehousePath = + Files.createTempDirectory("fluss-testing-datalake-tiered") + .resolve("warehouse") + .toString(); + } catch (Exception e) { + throw new FlussRuntimeException("Failed to create warehouse path"); + } + conf.setString("datalake.lance.warehouse", warehousePath); + return conf; + } + + @BeforeAll + protected static void beforeAll() { + clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + + @BeforeEach + public void beforeEach() { + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); + execEnv.setParallelism(2); + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @AfterAll + static void afterAll() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } + } + + protected static Map<String, String> getLanceCatalogConf() { + Map<String, String> lanceConf = new HashMap<>(); + lanceConf.put("warehouse", warehousePath); + return lanceConf; + } + + protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) + throws Exception { + admin.createTable(tablePath, tableDescriptor, true).get(); + return admin.getTableInfo(tablePath).get().getTableId(); + } + + protected long createLogTable(TablePath tablePath) throws Exception { + return createLogTable(tablePath, 1); + } + + protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception { + return createLogTable(tablePath, bucketNum, false); + } + + protected Replica getLeaderReplica(TableBucket tableBucket) { + return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + } + + protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) { + retry( + Duration.ofMinutes(1), + () -> { + Replica replica = getLeaderReplica(tb); + // datalake snapshot id should be updated + assertThat(replica.getLogTablet().getLakeTableSnapshotId()) + .isGreaterThanOrEqualTo(0); + assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset); + }); + } + + protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); + + TableDescriptor.Builder tableBuilder = + TableDescriptor.builder() + .distributedBy(bucketNum, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + + if (isPartitioned) { + schemaBuilder.column("c", DataTypes.STRING()); + tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); + tableBuilder.partitionedBy("c"); + tableBuilder.property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); + } + tableBuilder.schema(schemaBuilder.build()); + return createTable(tablePath, tableBuilder.build()); + } + + protected void writeRows(TablePath tablePath, List<InternalRow> rows, boolean append) + throws Exception { + try (Table table = conn.getTable(tablePath)) { + TableWriter tableWriter; + if (append) { + tableWriter = table.newAppend().createWriter(); + } else { + tableWriter = table.newUpsert().createWriter(); + } + for (InternalRow row : rows) { + if (tableWriter instanceof AppendWriter) { + ((AppendWriter) tableWriter).append(row); + } else { + ((UpsertWriter) tableWriter).upsert(row); + } + } + tableWriter.flush(); + } + } + + protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { + Configuration flussConfig = new Configuration(clientConf); + flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); + return LakeTieringJobBuilder.newBuilder( + execEnv, + flussConfig, + Configuration.fromMap(getLanceCatalogConf()), + DataLakeFormat.LANCE.toString()) + .build(); + } + + protected void waitUntilSnapshot(long tableId, int bucketNum, long snapshotId) { + for (int i = 0; i < bucketNum; i++) { + TableBucket tableBucket = new TableBucket(tableId, i); + FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java new file mode 100644 index 000000000..ee54dfd92 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.lance.LanceConfig; +import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.ReadOptions; +import com.lancedb.lance.Transaction; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for tiering tables to lance. */ +class LanceTieringITCase extends FlinkLanceTieringTestBase { + protected static final String DEFAULT_DB = "fluss"; + private static StreamExecutionEnvironment execEnv; + private static Configuration lanceConf; + private static final RootAllocator allocator = new RootAllocator(); + + @BeforeAll + protected static void beforeAll() { + FlinkLanceTieringTestBase.beforeAll(); + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.setParallelism(2); + execEnv.enableCheckpointing(1000); + lanceConf = Configuration.fromMap(getLanceCatalogConf()); + } + + @Test + void testTiering() throws Exception { + // create log table + TablePath t1 = TablePath.of(DEFAULT_DB, "logTable"); + long t1Id = createLogTable(t1); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + List<InternalRow> flussRows = new ArrayList<>(); + // write records + for (int i = 0; i < 10; i++) { + List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + flussRows.addAll(rows); + // write records + writeRows(t1, rows, true); + } + + // then start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + // check the status of replica after synced; + // note: we can't update log start offset for unaware bucket mode log table + assertReplicaStatus(t1Bucket, 30); + + LanceConfig config = + LanceConfig.from( + lanceConf.toMap(), + Collections.emptyMap(), + t1.getDatabaseName(), + t1.getTableName()); + + // check data in lance + checkDataInLanceAppendOnlyTable(config, flussRows); + // check snapshot property in lance + Map<String, String> properties = + new HashMap<String, String>() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + "[{\"bucket_id\":0,\"log_offset\":30}]"); + put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER); + } + }; + checkSnapshotPropertyInLance(config, properties); + + jobClient.cancel().get(); + } + + private void checkSnapshotPropertyInLance( + LanceConfig config, Map<String, String> expectedProperties) throws Exception { + ReadOptions.Builder builder = new ReadOptions.Builder(); + builder.setStorageOptions(LanceConfig.genStorageOptions(config)); + try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) { + Transaction transaction = dataset.readTransaction().orElse(null); + assertThat(transaction).isNotNull(); + assertThat(transaction.transactionProperties()).isEqualTo(expectedProperties); + } + } + + private void checkDataInLanceAppendOnlyTable(LanceConfig config, List<InternalRow> expectedRows) + throws Exception { + try (Dataset dataset = + Dataset.open( + allocator, + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + ArrowReader reader = dataset.newScan().scanBatches(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + reader.loadNextBatch(); + Iterator<InternalRow> flussRowIterator = expectedRows.iterator(); + int rowCount = readerRoot.getRowCount(); + for (int i = 0; i < rowCount; i++) { + InternalRow flussRow = flussRowIterator.next(); + assertThat((int) (readerRoot.getVector(0).getObject(i))) + .isEqualTo(flussRow.getInt(0)); + assertThat(((VarCharVector) readerRoot.getVector(1)).getObject(i).toString()) + .isEqualTo(flussRow.getString(1).toString()); + } + assertThat(reader.loadNextBatch()).isFalse(); + assertThat(flussRowIterator.hasNext()).isFalse(); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java new file mode 100644 index 000000000..60b7a330e --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.lance.tiering; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot; +import com.alibaba.fluss.lake.committer.LakeCommitter; +import com.alibaba.fluss.lake.lance.LanceConfig; +import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils; +import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.writer.LakeWriter; +import com.alibaba.fluss.lake.writer.WriterInitContext; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.GenericRecord; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.utils.types.Tuple2; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.WriteParams; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty; +import static org.assertj.core.api.Assertions.assertThat; + +/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */ +class LanceTieringTest { + private @TempDir File tempWarehouseDir; + private LanceLakeTieringFactory lanceLakeTieringFactory; + private Configuration configuration; + + @BeforeEach + void beforeEach() { + configuration = new Configuration(); + configuration.setString("warehouse", tempWarehouseDir.toString()); + lanceLakeTieringFactory = new LanceLakeTieringFactory(configuration); + } + + private static Stream<Arguments> tieringWriteArgs() { + return Stream.of(Arguments.of(false), Arguments.of(true)); + } + + @ParameterizedTest + @MethodSource("tieringWriteArgs") + void testTieringWriteTable(boolean isPartitioned) throws Exception { + int bucketNum = 3; + TablePath tablePath = TablePath.of("lance", "logTable"); + Map<String, String> customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + Schema schema = createTable(config); + + List<LanceWriteResult> lanceWriteResults = new ArrayList<>(); + SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer = + lanceLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer<LanceCommittable> committableSerializer = + lanceLakeTieringFactory.getCommittableSerializer(); + + try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter = + createLakeCommitter(tablePath)) { + // should no any missing snapshot + assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull(); + } + + Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>(); + Map<Long, String> partitionIdAndName = + isPartitioned + ? new HashMap<Long, String>() { + { + put(1L, "p1"); + put(2L, "p2"); + put(3L, "p3"); + } + } + : Collections.singletonMap(null, null); + List<String> partitionKeys = isPartitioned ? Arrays.asList("c3") : null; + Map<TableBucket, Long> tableBucketOffsets = new HashMap<>(); + // first, write data + for (int bucket = 0; bucket < bucketNum; bucket++) { + for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { + String partition = entry.getValue(); + try (LakeWriter<LanceWriteResult> lakeWriter = + createLakeWriter(tablePath, bucket, partition, schema, customProperties)) { + Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); + Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords = + genLogTableRecords(partition, bucket, 10); + List<LogRecord> writtenRecords = writeAndExpectRecords.f0; + List<LogRecord> expectRecords = writeAndExpectRecords.f1; + recordsByBucket.put(partitionBucket, expectRecords); + tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L); + for (LogRecord logRecord : writtenRecords) { + lakeWriter.write(logRecord); + } + // serialize/deserialize writeResult + LanceWriteResult lanceWriteResult = lakeWriter.complete(); + byte[] serialized = writeResultSerializer.serialize(lanceWriteResult); + lanceWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + } + + // second, commit data + try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter = + createLakeCommitter(tablePath)) { + // serialize/deserialize committable + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + byte[] serialized = committableSerializer.serialize(lanceCommittable); + lanceCommittable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + long snapshot = + lakeCommitter.commit( + lanceCommittable, + toBucketOffsetsProperty( + tableBucketOffsets, partitionIdAndName, partitionKeys)); + // lance dataset version starts from 1 + assertThat(snapshot).isEqualTo(2); + } + + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + ArrowReader reader = dataset.newScan().scanBatches(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + + // then, check data + for (int bucket = 0; bucket < 3; bucket++) { + for (String partition : partitionIdAndName.values()) { + reader.loadNextBatch(); + Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); + List<LogRecord> expectRecords = recordsByBucket.get(partitionBucket); + verifyLogTableRecords( + readerRoot, expectRecords, bucket, isPartitioned, partition); + } + } + assertThat(reader.loadNextBatch()).isFalse(); + } + + // then, let's verify getMissingLakeSnapshot works + try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter = + createLakeCommitter(tablePath)) { + // use snapshot id 1 as the known snapshot id + CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(1L); + assertThat(committedLakeSnapshot).isNotNull(); + Map<Tuple2<Long, Integer>, Long> offsets = committedLakeSnapshot.getLogEndOffsets(); + for (int bucket = 0; bucket < 3; bucket++) { + for (Long partitionId : partitionIdAndName.keySet()) { + // we only write 10 records, so expected log offset should be 10 + assertThat(offsets.get(Tuple2.of(partitionId, bucket))).isEqualTo(10); + } + } + assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L); + + // use null as the known snapshot id + CommittedLakeSnapshot committedLakeSnapshot2 = + lakeCommitter.getMissingLakeSnapshot(null); + assertThat(committedLakeSnapshot2).isEqualTo(committedLakeSnapshot); + + // use snapshot id 2 as the known snapshot id + committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(2L); + // no any missing committed offset since the latest snapshot is 2L + assertThat(committedLakeSnapshot).isNull(); + } + } + + private void verifyLogTableRecords( + VectorSchemaRoot root, + List<LogRecord> expectRecords, + int expectBucket, + boolean isPartitioned, + @Nullable String partition) + throws Exception { + assertThat(root.getRowCount()).isEqualTo(expectRecords.size()); + for (int i = 0; i < expectRecords.size(); i++) { + LogRecord expectRecord = expectRecords.get(i); + // check business columns: + assertThat((int) (root.getVector(0).getObject(i))) + .isEqualTo(expectRecord.getRow().getInt(0)); + assertThat(((VarCharVector) root.getVector(1)).getObject(i).toString()) + .isEqualTo(expectRecord.getRow().getString(1).toString()); + assertThat(((VarCharVector) root.getVector(2)).getObject(i).toString()) + .isEqualTo(expectRecord.getRow().getString(2).toString()); + } + } + + private LakeCommitter<LanceWriteResult, LanceCommittable> createLakeCommitter( + TablePath tablePath) throws IOException { + return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath); + } + + private LakeWriter<LanceWriteResult> createLakeWriter( + TablePath tablePath, + int bucket, + @Nullable String partition, + Schema schema, + Map<String, String> customProperties) + throws IOException { + return lanceLakeTieringFactory.createLakeWriter( + new WriterInitContext() { + @Override + public TablePath tablePath() { + return tablePath; + } + + @Override + public TableBucket tableBucket() { + // don't care about tableId & partitionId + return new TableBucket(0, 0L, bucket); + } + + @Nullable + @Override + public String partition() { + return partition; + } + + @Override + public com.alibaba.fluss.metadata.Schema schema() { + return schema; + } + + @Override + public Map<String, String> customProperties() { + return customProperties; + } + }); + } + + private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords( + @Nullable String partition, int bucket, int numRecords) { + List<LogRecord> logRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + GenericRow genericRow; + if (partition != null) { + // Partitioned table: include partition field in data + genericRow = new GenericRow(3); // c1, c2, c3(partition) + genericRow.setField(0, i); + genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); + genericRow.setField(2, BinaryString.fromString(partition)); // partition field + } else { + // Non-partitioned table + genericRow = new GenericRow(3); + genericRow.setField(0, i); + genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); + genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); + } + LogRecord logRecord = + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); + logRecords.add(logRecord); + } + return Tuple2.of(logRecords, logRecords); + } + + private Schema createTable(LanceConfig config) throws Exception { + List<Schema.Column> columns = new ArrayList<>(); + columns.add(new Schema.Column("c1", DataTypes.INT())); + columns.add(new Schema.Column("c2", DataTypes.STRING())); + columns.add(new Schema.Column("c3", DataTypes.STRING())); + Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns); + Schema schema = schemaBuilder.build(); + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + return schema; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 3dcfc826d..fe2933906 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -126,7 +126,6 @@ public class FlinkPaimonTieringTestBase { execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); execEnv.setParallelism(2); - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); } protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception {