This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-array-flink in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 6de9f65b485e847a6232f9f2f5dd19a9d278d331 Author: forwardxu <[email protected]> AuthorDate: Thu Nov 27 14:51:46 2025 +0800 [flink] Support Array type in Flink connector --- .../apache/fluss/record/DefaultLogRecordBatch.java | 25 ++- .../org/apache/fluss/record/LogRecordBatch.java | 8 + .../apache/fluss/record/LogRecordReadContext.java | 26 +++ .../org/apache/fluss/row/arrow/ArrowReader.java | 3 +- .../org/apache/fluss/row/arrow/ArrowWriter.java | 22 +- .../fluss/row/arrow/writers/ArrowArrayWriter.java | 29 +++ .../fluss/row/arrow/writers/ArrowFieldWriter.java | 5 + .../java/org/apache/fluss/utils/ArrowUtils.java | 92 +++++++- .../fluss/row/arrow/ArrowReaderWriterTest.java | 9 +- .../flink/sink/Flink118ComplexTypeITCase.java | 21 ++ .../flink/sink/Flink119ComplexTypeITCase.java | 21 ++ .../flink/sink/Flink120ComplexTypeITCase.java | 21 ++ .../fluss/flink/sink/Flink21ComplexTypeITCase.java | 21 ++ .../apache/fluss/flink/utils/FlinkConversions.java | 8 +- .../fluss/flink/utils/FlinkSchemaValidator.java | 122 +++++++++++ .../fluss/flink/sink/FlinkComplexTypeITCase.java | 231 +++++++++++++++++++++ .../flink/utils/FlinkSchemaValidatorTest.java | 229 ++++++++++++++++++++ 17 files changed, 878 insertions(+), 15 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index eaacc48a6..2c6bce911 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -223,7 +223,8 @@ public class DefaultLogRecordBatch implements LogRecordBatch { rowType, context.getVectorSchemaRoot(schemaId), context.getBufferAllocator(), - timestamp); + timestamp, + context); case INDEXED: return rowRecordIterator(rowType, timestamp); default: @@ -279,7 +280,11 @@ public class DefaultLogRecordBatch implements LogRecordBatch { } private CloseableIterator<LogRecord> columnRecordIterator( - RowType rowType, VectorSchemaRoot root, BufferAllocator allocator, long timestamp) { + RowType rowType, + VectorSchemaRoot root, + BufferAllocator allocator, + long timestamp, + ReadContext readContext) { boolean isAppendOnly = (attributes() & APPEND_ONLY_FLAG_MASK) > 0; if (isAppendOnly) { // append only batch, no change type vector, @@ -289,7 +294,13 @@ public class DefaultLogRecordBatch implements LogRecordBatch { int arrowLength = sizeInBytes() - recordBatchHeaderSize; ArrowReader reader = ArrowUtils.createArrowReader( - segment, arrowOffset, arrowLength, root, allocator, rowType); + segment, + arrowOffset, + arrowLength, + root, + allocator, + rowType, + readContext); return new ArrowLogRecordIterator(reader, timestamp) { @Override protected ChangeType getChangeType(int rowId) { @@ -307,7 +318,13 @@ public class DefaultLogRecordBatch implements LogRecordBatch { sizeInBytes() - arrowChangeTypeOffset(magic) - changeTypeVector.sizeInBytes(); ArrowReader reader = ArrowUtils.createArrowReader( - segment, arrowOffset, arrowLength, root, allocator, rowType); + segment, + arrowOffset, + arrowLength, + root, + allocator, + rowType, + readContext); return new ArrowLogRecordIterator(reader, timestamp) { @Override protected ChangeType getChangeType(int rowId) { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java index cadc1ef9d..c226f25a4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java @@ -195,5 +195,13 @@ public interface LogRecordBatch { /** Gets the buffer allocator. */ BufferAllocator getBufferAllocator(); + + /** + * Registers a batch VectorSchemaRoot to be closed when this context is closed. This is used + * to track VectorSchemaRoots created for individual batches. + * + * @param batchRoot the batch VectorSchemaRoot to register + */ + void registerBatchRoot(VectorSchemaRoot batchRoot); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 1158dbc35..08ef69037 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -32,6 +32,7 @@ import org.apache.fluss.utils.Projection; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.stream.IntStream; @@ -55,6 +56,8 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo private final FieldGetter[] selectedFieldGetters; // whether the projection is push downed to the server side and the returned data is pruned. private final boolean projectionPushDowned; + // track all batch VectorSchemaRoots created for this context + private final List<VectorSchemaRoot> batchRoots; /** * Creates a LogRecordReadContext for the given table information and projection information. @@ -164,6 +167,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo this.bufferAllocator = bufferAllocator; this.selectedFieldGetters = selectedFieldGetters; this.projectionPushDowned = projectionPushDowned; + this.batchRoots = new ArrayList<>(); } @Override @@ -215,7 +219,29 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo return bufferAllocator; } + /** + * Registers a batch VectorSchemaRoot to be closed when this context is closed. This is used to + * track VectorSchemaRoots created for individual batches. + */ + @Override + public void registerBatchRoot(VectorSchemaRoot batchRoot) { + if (batchRoot != null) { + batchRoots.add(batchRoot); + } + } + public void close() { + // Close all batch roots first + for (VectorSchemaRoot root : batchRoots) { + try { + root.close(); + } catch (Exception e) { + // Continue closing other roots even if one fails + } + } + batchRoots.clear(); + + // Then close the shared schema root and allocator if (vectorSchemaRoot != null) { vectorSchemaRoot.close(); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java index ea5afe930..d50eb8fe0 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java @@ -58,6 +58,7 @@ public class ArrowReader { } public void close() { - root.close(); + // Do not close the VectorSchemaRoot here. + // The root will be closed when LogRecordReadContext closes. } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java index 9b0ab0081..8c1ba501b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java @@ -30,6 +30,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVe import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorUnloader; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.WriteChannel; @@ -181,7 +182,10 @@ public class ArrowWriter implements AutoCloseable { for (int i = 0; i < fieldWriters.length; i++) { FieldVector fieldVector = root.getVector(i); initFieldVector(fieldVector); - fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, schema.getTypeAt(i)); + } + // Reset field writers to clear their offset counters (for ArrayWriter) + for (ArrowFieldWriter fieldWriter : fieldWriters) { + resetFieldWriter(fieldWriter); } root.setRowCount(0); recordsCount = 0; @@ -285,6 +289,10 @@ public class ArrowWriter implements AutoCloseable { if (this.epoch == epoch) { root.clear(); recordsCount = 0; + // Reset array writers when recycling + for (ArrowFieldWriter fieldWriter : fieldWriters) { + resetFieldWriter(fieldWriter); + } provider.recycleWriter(this); } } @@ -302,11 +310,23 @@ public class ArrowWriter implements AutoCloseable { ((BaseFixedWidthVector) fieldVector).allocateNew(INITIAL_CAPACITY); } else if (fieldVector instanceof BaseVariableWidthVector) { ((BaseVariableWidthVector) fieldVector).allocateNew(INITIAL_CAPACITY); + } else if (fieldVector instanceof ListVector) { + ListVector listVector = (ListVector) fieldVector; + listVector.allocateNew(); + FieldVector dataVector = listVector.getDataVector(); + if (dataVector != null) { + initFieldVector(dataVector); + } } else { fieldVector.allocateNew(); } } + /** Resets field writers to clear their state for reuse. */ + private void resetFieldWriter(ArrowFieldWriter fieldWriter) { + fieldWriter.reset(); + } + private int estimatedBytesWritten(int currentBytes) { if (compressionCodec.getCodecType() == CompressionUtil.CodecType.NO_COMPRESSION) { return currentBytes; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java index 61427cb06..c5001af56 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java @@ -45,5 +45,34 @@ public class ArrowArrayWriter extends ArrowFieldWriter { } offset += array.size(); listVector.endValue(rowIndex, array.size()); + + // Fix dataVector valueCount after writing each row + // This ensures VectorUnloader has correct valueCount for all vectors + FieldVector dataVector = listVector.getDataVector(); + if (dataVector != null && offset > 0) { + dataVector.setValueCount(offset); + // Recursively fix nested ListVectors + fixNestedListVectorValueCount(dataVector, offset); + } + } + + private void fixNestedListVectorValueCount(FieldVector vector, int parentCount) { + if (vector instanceof ListVector && parentCount > 0) { + ListVector listVector = (ListVector) vector; + int dataVectorValueCount = listVector.getElementEndIndex(parentCount - 1); + FieldVector dataVector = listVector.getDataVector(); + if (dataVector != null && dataVectorValueCount > 0) { + dataVector.setValueCount(dataVectorValueCount); + fixNestedListVectorValueCount(dataVector, dataVectorValueCount); + } + } + } + + /** Resets the offset counter for reuse. */ + @Override + public void reset() { + super.reset(); + elementWriter.reset(); + offset = 0; } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java index bb020196f..8dd16c64b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java @@ -59,4 +59,9 @@ public abstract class ArrowFieldWriter { doWrite(rowIndex, getters, ordinal, handleSafe); } } + + /** Resets the state of the writer to write the next batch of fields. */ + public void reset() { + fieldVector.reset(); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java index 72da2556c..8e9ab3e19 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java @@ -159,28 +159,110 @@ public class ArrowUtils { MemorySegment segment, int arrowOffset, int arrowLength, - VectorSchemaRoot schemaRoot, + VectorSchemaRoot sharedSchemaRoot, BufferAllocator allocator, - RowType rowType) { + RowType rowType, + org.apache.fluss.record.LogRecordBatch.ReadContext readContext) { ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength); + try (ReadChannel channel = new ReadChannel(new ByteBufferReadableChannel(arrowBatchBuffer)); ArrowRecordBatch batch = deserializeRecordBatch(channel, allocator)) { + + // Create a new VectorSchemaRoot for each batch to avoid data contamination + // when the shared root is reused for subsequent batches + VectorSchemaRoot batchSchemaRoot = + VectorSchemaRoot.create(sharedSchemaRoot.getSchema(), allocator); + VectorLoader vectorLoader = - new VectorLoader(schemaRoot, ArrowCompressionFactory.INSTANCE); + new VectorLoader(batchSchemaRoot, ArrowCompressionFactory.INSTANCE); vectorLoader.load(batch); + + // Fix buffer writerIndex after VectorLoader.load + // VectorLoader.load() sets the capacity but not the writerIndex for buffers. + // This is especially critical for Array types (ListVector), where we need to: + // 1. Fix writerIndex for all buffers to match their capacity + // 2. Fix nested dataVector's valueCount based on the last offset + // This was not needed before Array type support, but is now essential + // for correct Arrow data reading with complex nested types. + fixVectorBuffers(batchSchemaRoot); + List<ColumnVector> columnVectors = new ArrayList<>(); - List<FieldVector> fieldVectors = schemaRoot.getFieldVectors(); + List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors(); for (int i = 0; i < fieldVectors.size(); i++) { columnVectors.add( createArrowColumnVector(fieldVectors.get(i), rowType.getTypeAt(i))); } - return new ArrowReader(schemaRoot, columnVectors.toArray(new ColumnVector[0])); + + // Register the batch root with the read context so it can be closed later + if (readContext != null) { + readContext.registerBatchRoot(batchSchemaRoot); + } + + return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new ColumnVector[0])); } catch (IOException e) { throw new RuntimeException("Failed to deserialize ArrowRecordBatch.", e); } } + /** + * Fixes writerIndex for all buffers in all vectors after VectorLoader.load(). + * + * <p>VectorLoader.load() sets the capacity but not the writerIndex for buffers, which can cause + * issues when reading data. This method recursively fixes all vectors: + * + * <ul> + * <li>For all vectors: sets writerIndex to match capacity for proper buffer reading + * <li>For ListVector (Array type): additionally fixes the nested dataVector's valueCount + * based on the last offset, and recursively fixes nested structures + * </ul> + * + * <p>This is especially critical for Array types introduced in the system, where nested + * dataVectors need correct valueCount to avoid data corruption or reading errors. + */ + private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) { + for (FieldVector fieldVector : schemaRoot.getFieldVectors()) { + fixVectorBuffersRecursive(fieldVector); + } + } + + private static void fixVectorBuffersRecursive(FieldVector vector) { + // Fix all buffers in this vector + for (ArrowBuf buf : vector.getFieldBuffers()) { + if (buf.capacity() > 0 && buf.writerIndex() < buf.capacity()) { + buf.writerIndex((int) buf.capacity()); + } + } + + // Special handling for ListVector: fix dataVector valueCount and buffers + // This is critical for Array type support. Without this, the nested dataVector + // may have incorrect valueCount, leading to data corruption or reading errors. + if (vector instanceof ListVector) { + ListVector listVector = (ListVector) vector; + int vectorValueCount = listVector.getValueCount(); + + if (vectorValueCount > 0) { + FieldVector dataVector = listVector.getDataVector(); + if (dataVector != null) { + // Calculate the correct valueCount for dataVector from the last offset + // For array [a,b,c] followed by [d,e], offsets are [0,3,5], so dataVector + // needs valueCount=5 (not the parent's valueCount=2) + int dataVectorValueCount = listVector.getElementEndIndex(vectorValueCount - 1); + if (dataVectorValueCount > 0) { + dataVector.setValueCount(dataVectorValueCount); + } + // Recursively fix the dataVector (needed for nested arrays) + fixVectorBuffersRecursive(dataVector); + } + } + } else if (vector.getChildrenFromFields() != null) { + // Recursively fix other child vectors + for (FieldVector child : vector.getChildrenFromFields()) { + fixVectorBuffersRecursive(child); + } + } + } + /** * Serialize metadata of a {@link ArrowRecordBatch} into write channel. This avoids to create an * instance of {@link ArrowRecordBatch}. diff --git a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java index 039032a1e..295d6272a 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.memory.AbstractPagedOutputView; import org.apache.fluss.memory.ManagedPagedOutputView; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; @@ -177,7 +178,9 @@ class ArrowReaderWriterTest { ArrowWriterPool provider = new ArrowWriterPool(allocator); ArrowWriter writer = provider.getOrCreateWriter( - 1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) { + 1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION); + LogRecordReadContext readContext = + LogRecordReadContext.createArrowReadContext(rowType, 0)) { for (InternalRow row : TEST_DATA) { writer.writeRow(row); } @@ -197,7 +200,8 @@ class ArrowReaderWriterTest { firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size); ArrowReader reader = - ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType); + ArrowUtils.createArrowReader( + segment, 0, size, root, allocator, rowType, readContext); int rowCount = reader.getRowCount(); for (int i = 0; i < rowCount; i++) { ColumnarRow row = reader.read(i); @@ -238,7 +242,6 @@ class ArrowReaderWriterTest { assertThat(row.getTimestampLtz(20, 6)).isEqualTo(rowData.getTimestampLtz(20, 6)); assertThat(row.getTimestampLtz(21, 9)).isEqualTo(rowData.getTimestampLtz(21, 9)); } - reader.close(); } } diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java new file mode 100644 index 000000000..946b4abb6 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 1.18. */ +public class Flink118ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java new file mode 100644 index 000000000..f25e943d9 --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 1.19. */ +public class Flink119ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java new file mode 100644 index 000000000..86781efaf --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 1.20. */ +public class Flink120ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java new file mode 100644 index 000000000..7b2ed44b2 --- /dev/null +++ b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +/** Integration tests for Array type support in Flink 2.1. */ +public class Flink21ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 3256b3f52..1517fd0d5 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -190,7 +190,9 @@ public class FlinkConversions { // now, build Fluss's table Schema.Builder schemBuilder = Schema.newBuilder(); if (resolvedSchema.getPrimaryKey().isPresent()) { - schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns()); + List<String> primaryKeyColumns = resolvedSchema.getPrimaryKey().get().getColumns(); + FlinkSchemaValidator.validatePrimaryKeyColumns(resolvedSchema, primaryKeyColumns); + schemBuilder.primaryKey(primaryKeyColumns); } // first build schema with physical columns @@ -223,6 +225,8 @@ public class FlinkConversions { ? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys() : ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys(); + FlinkSchemaValidator.validatePartitionKeyColumns(resolvedSchema, partitionKeys); + Map<String, String> customProperties = flinkTableConf.toMap(); CatalogPropertiesUtils.serializeComputedColumns( customProperties, resolvedSchema.getColumns()); @@ -253,6 +257,8 @@ public class FlinkConversions { Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(",")) .map(String::trim) .collect(Collectors.toList()); + + FlinkSchemaValidator.validateBucketKeyColumns(resolvedSchema, bucketKey); } else { // use primary keys - partition keys bucketKey = diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java new file mode 100644 index 000000000..68cbb1b65 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.List; + +/** Validator for Flink schema constraints. */ +public class FlinkSchemaValidator { + + private FlinkSchemaValidator() {} + + /** + * Validates that primary key columns do not contain ARRAY type. + * + * @param resolvedSchema the resolved schema + * @param primaryKeyColumns the list of primary key column names + * @throws CatalogException if a primary key column is not found in schema + * @throws UnsupportedOperationException if a primary key column is of ARRAY type + */ + public static void validatePrimaryKeyColumns( + ResolvedSchema resolvedSchema, List<String> primaryKeyColumns) { + for (String pkColumn : primaryKeyColumns) { + Column column = + resolvedSchema + .getColumn(pkColumn) + .orElseThrow( + () -> + new CatalogException( + "Primary key column " + + pkColumn + + " not found in schema.")); + LogicalType logicalType = column.getDataType().getLogicalType(); + if (logicalType instanceof ArrayType) { + throw new UnsupportedOperationException( + String.format( + "Column '%s' of ARRAY type is not supported as primary key.", + pkColumn)); + } + } + } + + /** + * Validates that partition key columns do not contain ARRAY type. + * + * @param resolvedSchema the resolved schema + * @param partitionKeyColumns the list of partition key column names + * @throws CatalogException if a partition key column is not found in schema + * @throws UnsupportedOperationException if a partition key column is of ARRAY type + */ + public static void validatePartitionKeyColumns( + ResolvedSchema resolvedSchema, List<String> partitionKeyColumns) { + for (String partitionKey : partitionKeyColumns) { + Column column = + resolvedSchema + .getColumn(partitionKey) + .orElseThrow( + () -> + new CatalogException( + "Partition key column " + + partitionKey + + " not found in schema.")); + LogicalType logicalType = column.getDataType().getLogicalType(); + if (logicalType instanceof ArrayType) { + throw new UnsupportedOperationException( + String.format( + "Column '%s' of ARRAY type is not supported as partition key.", + partitionKey)); + } + } + } + + /** + * Validates that bucket key columns do not contain ARRAY type. + * + * @param resolvedSchema the resolved schema + * @param bucketKeyColumns the list of bucket key column names + * @throws CatalogException if a bucket key column is not found in schema + * @throws UnsupportedOperationException if a bucket key column is of ARRAY type + */ + public static void validateBucketKeyColumns( + ResolvedSchema resolvedSchema, List<String> bucketKeyColumns) { + for (String bkColumn : bucketKeyColumns) { + Column column = + resolvedSchema + .getColumn(bkColumn) + .orElseThrow( + () -> + new CatalogException( + "Bucket key column " + + bkColumn + + " not found in schema.")); + LogicalType logicalType = column.getDataType().getLogicalType(); + if (logicalType instanceof ArrayType) { + throw new UnsupportedOperationException( + String.format( + "Column '%s' of ARRAY type is not supported as bucket key.", + bkColumn)); + } + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java new file mode 100644 index 000000000..13d91bcd9 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink; + +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Integration tests for Array type support in Flink connector. */ +abstract class FlinkComplexTypeITCase extends AbstractTestBase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder().setNumOfTabletServers(3).build(); + + static final String CATALOG_NAME = "testcatalog"; + static final String DEFAULT_DB = "defaultdb"; + + protected StreamExecutionEnvironment env; + protected StreamTableEnvironment tEnv; + protected TableEnvironment tBatchEnv; + + @BeforeEach + void before() { + String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers(); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + + tEnv = StreamTableEnvironment.create(env); + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.executeSql("use catalog " + CATALOG_NAME); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + tBatchEnv = + TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); + tBatchEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tBatchEnv.executeSql("use catalog " + CATALOG_NAME); + tBatchEnv + .getConfig() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.useDatabase(DEFAULT_DB); + tBatchEnv.useDatabase(DEFAULT_DB); + } + + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + } + + @Test + void testArrayTypesInLogTable() throws Exception { + tEnv.executeSql( + "create table array_log_test (" + + "id int, " + + "int_array array<int>, " + + "bigint_array array<bigint>, " + + "float_array array<float>, " + + "double_array array<double>, " + + "string_array array<string>, " + + "boolean_array array<boolean>, " + + "nested_int_array array<array<int>>, " + + "nested_string_array array<array<string>>, " + + "deeply_nested_array array<array<array<int>>>" + + ") with ('bucket.num' = '3')"); + + tEnv.executeSql( + "INSERT INTO array_log_test VALUES " + + "(1, ARRAY[1, 2, CAST(NULL AS INT)], ARRAY[100, CAST(NULL AS BIGINT), 300], " + + "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], " + + "ARRAY['a', CAST(NULL AS STRING), 'c'], ARRAY[true, CAST(NULL AS BOOLEAN), false], " + + "ARRAY[ARRAY[1, 2], CAST(NULL AS ARRAY<INT>), ARRAY[3]], " + + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS STRING), 'y']], " + + "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 5]]]), " + + "(2, CAST(NULL AS ARRAY<INT>), ARRAY[400, 500], " + + "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], " + + "ARRAY['d', 'e'], ARRAY[true], " + + "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], " + + "ARRAY[ARRAY[ARRAY[9]]])") + .await(); + + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from array_log_test").collect(); + List<String> expectedRows = + Arrays.asList( + "+I[1, [1, 2, null], [100, null, 300], [1.1, null], [2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, [3]], [[x], [null, y]], [[[1, 2]], [[3, 4, 5]]]]", + "+I[2, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]], [[[9]]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + + @Test + void testArrayTypesInPrimaryKeyTable() throws Exception { + tEnv.executeSql( + "create table array_pk_test (" + + "id int, " + + "int_array array<int>, " + + "bigint_array array<bigint>, " + + "float_array array<float>, " + + "double_array array<double>, " + + "string_array array<string>, " + + "boolean_array array<boolean>, " + + "nested_int_array array<array<int>>, " + + "nested_string_array array<array<string>>, " + + "primary key(id) not enforced" + + ") with ('bucket.num' = '3')"); + + tEnv.executeSql( + "INSERT INTO array_pk_test VALUES " + + "(1, ARRAY[1, 2], ARRAY[100, 300], ARRAY[CAST(1.1 AS FLOAT)], ARRAY[2.2, 3.3], " + + "ARRAY['a', CAST(NULL AS STRING), 'c'], ARRAY[true, false], " + + "ARRAY[ARRAY[1, 2], CAST(NULL AS ARRAY<INT>), ARRAY[3]], " + + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS STRING), 'y']]), " + + "(2, CAST(NULL AS ARRAY<INT>), ARRAY[400, 500], ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], " + + "ARRAY['d', 'e'], ARRAY[true], ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']]), " + + "(3, ARRAY[10], ARRAY[600], ARRAY[CAST(7.7 AS FLOAT)], ARRAY[8.8], " + + "ARRAY['f'], ARRAY[false], ARRAY[ARRAY[9]], ARRAY[ARRAY['w']])") + .await(); + + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from array_pk_test").collect(); + List<String> expectedRows = + Arrays.asList( + "+I[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]", + "+I[2, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]]]", + "+I[3, [10], [600], [7.7], [8.8], [f], [false], [[9]], [[w]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + tEnv.executeSql( + "INSERT INTO array_pk_test VALUES " + + "(1, ARRAY[100, 200], ARRAY[1000], ARRAY[CAST(10.1 AS FLOAT)], ARRAY[11.1], " + + "ARRAY['updated'], ARRAY[false], ARRAY[ARRAY[100]], ARRAY[ARRAY['updated']]), " + + "(4, ARRAY[20, 30], ARRAY[2000, 3000], ARRAY[CAST(20.2 AS FLOAT)], ARRAY[30.3], " + + "ARRAY['new'], ARRAY[true], ARRAY[ARRAY[200], ARRAY[300]], ARRAY[ARRAY['new1'], ARRAY['new2']])") + .await(); + + expectedRows = + Arrays.asList( + "-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]", + "+U[1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]", + "+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], [true], [[200], [300]], [[new1], [new2]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + + @Test + void testArrayTypeAsPartitionKeyThrowsException() { + assertThatThrownBy( + () -> + tEnv.executeSql( + "create table array_partition_test (" + + "id int, " + + "data string, " + + "tags array<string>, " + + "primary key(id) not enforced" + + ") partitioned by (tags)")) + .cause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("is not supported"); + } + + @Test + void testArrayTypeAsPrimaryKeyThrowsException() { + assertThatThrownBy( + () -> + tEnv.executeSql( + "create table array_pk_invalid (" + + "id int, " + + "data array<string>, " + + "primary key(data) not enforced" + + ")")) + .cause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("is not supported"); + } + + @Test + void testArrayTypeAsBucketKeyThrowsException() { + assertThatThrownBy( + () -> + tEnv.executeSql( + "create table array_bucket_test (" + + "id int, " + + "data array<string>, " + + "primary key(id) not enforced" + + ") with ('bucket.key' = 'data', 'bucket.num' = '3')")) + .cause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("is not supported"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java new file mode 100644 index 000000000..50e9f91b6 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.api.DataTypes.ARRAY; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link FlinkSchemaValidator}. */ +public class FlinkSchemaValidatorTest { + + @Test + void testValidatePrimaryKeyColumnsWithValidTypes() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("name", STRING())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_id", Collections.singletonList("id"))); + + List<String> primaryKeyColumns = Collections.singletonList("id"); + FlinkSchemaValidator.validatePrimaryKeyColumns(schema, primaryKeyColumns); + } + + @Test + void testValidatePrimaryKeyColumnsWithArrayType() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", ARRAY(INT()).notNull()), + Column.physical("name", STRING())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_id", Collections.singletonList("id"))); + + List<String> primaryKeyColumns = Collections.singletonList("id"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validatePrimaryKeyColumns( + schema, primaryKeyColumns)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Column 'id' of ARRAY type is not supported as primary key."); + } + + @Test + void testValidatePrimaryKeyColumnsWithMissingColumn() { + ResolvedSchema schema = + new ResolvedSchema( + Collections.singletonList(Column.physical("name", STRING())), + Collections.emptyList(), + null); + + List<String> primaryKeyColumns = Collections.singletonList("id"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validatePrimaryKeyColumns( + schema, primaryKeyColumns)) + .isInstanceOf(CatalogException.class) + .hasMessageContaining("Primary key column id not found in schema."); + } + + @Test + void testValidatePartitionKeyColumnsWithValidTypes() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("name", STRING()), + Column.physical("date", STRING())), + Collections.emptyList(), + null); + + List<String> partitionKeyColumns = Arrays.asList("name", "date"); + FlinkSchemaValidator.validatePartitionKeyColumns(schema, partitionKeyColumns); + } + + @Test + void testValidatePartitionKeyColumnsWithArrayType() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("tags", ARRAY(STRING())), + Column.physical("name", STRING())), + Collections.emptyList(), + null); + + List<String> partitionKeyColumns = Collections.singletonList("tags"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validatePartitionKeyColumns( + schema, partitionKeyColumns)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Column 'tags' of ARRAY type is not supported as partition key."); + } + + @Test + void testValidatePartitionKeyColumnsWithMissingColumn() { + ResolvedSchema schema = + new ResolvedSchema( + Collections.singletonList(Column.physical("name", STRING())), + Collections.emptyList(), + null); + + List<String> partitionKeyColumns = Collections.singletonList("date"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validatePartitionKeyColumns( + schema, partitionKeyColumns)) + .isInstanceOf(CatalogException.class) + .hasMessageContaining("Partition key column date not found in schema."); + } + + @Test + void testValidateBucketKeyColumnsWithValidTypes() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("name", STRING()), + Column.physical("category", STRING())), + Collections.emptyList(), + null); + + List<String> bucketKeyColumns = Arrays.asList("id", "name"); + FlinkSchemaValidator.validateBucketKeyColumns(schema, bucketKeyColumns); + } + + @Test + void testValidateBucketKeyColumnsWithArrayType() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("tags", ARRAY(STRING())), + Column.physical("name", STRING())), + Collections.emptyList(), + null); + + List<String> bucketKeyColumns = Collections.singletonList("tags"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validateBucketKeyColumns( + schema, bucketKeyColumns)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Column 'tags' of ARRAY type is not supported as bucket key."); + } + + @Test + void testValidateBucketKeyColumnsWithMissingColumn() { + ResolvedSchema schema = + new ResolvedSchema( + Collections.singletonList(Column.physical("name", STRING())), + Collections.emptyList(), + null); + + List<String> bucketKeyColumns = Collections.singletonList("bucket_col"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validateBucketKeyColumns( + schema, bucketKeyColumns)) + .isInstanceOf(CatalogException.class) + .hasMessageContaining("Bucket key column bucket_col not found in schema."); + } + + @Test + void testValidatePrimaryKeyColumnsWithMultipleColumns() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("name", STRING().notNull()), + Column.physical("tags", ARRAY(STRING()))), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_id_name", Arrays.asList("id", "name"))); + + List<String> primaryKeyColumns = Arrays.asList("id", "name"); + FlinkSchemaValidator.validatePrimaryKeyColumns(schema, primaryKeyColumns); + } + + @Test + void testValidatePrimaryKeyColumnsWithMultipleColumnsOneArray() { + ResolvedSchema schema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", INT().notNull()), + Column.physical("name", STRING().notNull()), + Column.physical("tags", ARRAY(STRING()).notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("PK_id_tags", Arrays.asList("id", "tags"))); + + List<String> primaryKeyColumns = Arrays.asList("id", "tags"); + assertThatThrownBy( + () -> + FlinkSchemaValidator.validatePrimaryKeyColumns( + schema, primaryKeyColumns)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Column 'tags' of ARRAY type is not supported as primary key."); + } +}
