This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e3263fa7c2 [ASTERIXDB-3324][STO] Ensure PKs uniqueness on load
e3263fa7c2 is described below

commit e3263fa7c21dbe5fb7e80d1925b0a889dc483449
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue Dec 5 10:47:58 2023 -0800

    [ASTERIXDB-3324][STO] Ensure PKs uniqueness on load
    
    Details:
    On load, an exception should be thrown if duplicate PKs
    were encountered.
    
    Change-Id: I140589d92b142fb4661b4112a28424fa168684ef
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17996
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../apache/asterix/column/ColumnManagerFactory.java  |  7 +++++--
 .../lsm/load/LoadColumnTupleReaderWriterFactory.java |  8 ++++++--
 .../operation/lsm/load/LoadColumnTupleWriter.java    | 20 +++++++++++++++++++-
 .../lsm/btree/column/api/IColumnManagerFactory.java  |  6 +++++-
 .../lsm/btree/column/utils/LSMColumnBTreeUtil.java   |  2 +-
 5 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
index cfd514353c..2780f927a4 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/ColumnManagerFactory.java
@@ -26,6 +26,7 @@ import 
org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWrite
 import 
org.apache.asterix.column.operation.lsm.load.LoadColumnTupleReaderWriterFactory;
 import 
org.apache.asterix.column.operation.lsm.merge.MergeColumnTupleReaderWriterFactory;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
@@ -71,8 +72,10 @@ public final class ColumnManagerFactory implements 
IColumnManagerFactory {
     }
 
     @Override
-    public AbstractColumnTupleReaderWriterFactory 
getLoadColumnTupleReaderWriterFactory() {
-        return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount, 
tolerance, maxLeafNodeSize);
+    public AbstractColumnTupleReaderWriterFactory 
getLoadColumnTupleReaderWriterFactory(
+            IBinaryComparatorFactory[] cmpFactories) {
+        return new LoadColumnTupleReaderWriterFactory(pageSize, maxTupleCount, 
tolerance, maxLeafNodeSize,
+                cmpFactories);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index dec2ec3934..7fc6fbdd91 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -20,20 +20,24 @@ package org.apache.asterix.column.operation.lsm.load;
 
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
 import 
org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LoadColumnTupleReaderWriterFactory extends 
FlushColumnTupleReaderWriterFactory {
     private static final long serialVersionUID = -7583574057314353873L;
+    private final IBinaryComparatorFactory[] cmpFactories;
 
     public LoadColumnTupleReaderWriterFactory(int pageSize, int 
maxNumberOfTuples, double tolerance,
-            int maxLeafNodeSize) {
+            int maxLeafNodeSize, IBinaryComparatorFactory[] cmpFactories) {
         super(pageSize, maxNumberOfTuples, tolerance, maxLeafNodeSize);
+        this.cmpFactories = cmpFactories;
     }
 
     @Override
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata 
columnMetadata) {
         return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, 
pageSize, maxNumberOfTuples, tolerance,
-                maxLeafNodeSize);
+                maxLeafNodeSize, MultiComparator.create(cmpFactories));
     }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index e47b210fa5..ca14000d45 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -20,17 +20,35 @@ package org.apache.asterix.column.operation.lsm.load;
 
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
+    private final PointableTupleReference prevTupleKeys;
+    private final MultiComparator comparator;
+
     public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int 
pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
+            double tolerance, int maxLeafNodeSize, MultiComparator comparator) 
{
         super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, 
maxLeafNodeSize);
+        prevTupleKeys =
+                
PointableTupleReference.create(columnMetadata.getNumberOfPrimaryKeys(), 
ArrayBackedValueStorage::new);
+        this.comparator = comparator;
     }
 
     @Override
     public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+        ensureKeysUniqueness(tuple);
         writeRecord(tuple);
     }
+
+    private void ensureKeysUniqueness(ITupleReference tuple) throws 
HyracksDataException {
+        if (prevTupleKeys.getFieldLength(0) > 0 && 
comparator.compare(prevTupleKeys, tuple) == 0) {
+            throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
+        }
+        prevTupleKeys.set(tuple);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
index a2dfbcf0da..d1b99468f9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnManagerFactory.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.btree.column.api;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 
@@ -31,8 +32,11 @@ public interface IColumnManagerFactory extends Serializable, 
IJsonSerializable {
 
     /**
      * Get column tuple reader/writer for the {@link LSMIOOperationType#LOAD}
+     *
+     * @param cmpFactories Primary keys comparators' factories
      */
-    AbstractColumnTupleReaderWriterFactory 
getLoadColumnTupleReaderWriterFactory();
+    AbstractColumnTupleReaderWriterFactory 
getLoadColumnTupleReaderWriterFactory(
+            IBinaryComparatorFactory[] cmpFactories);
 
     /**
      * Get column tuple reader/writer for the {@link LSMIOOperationType#FLUSH}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 6ef0dc6f1d..490fa2e0bb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -82,7 +82,7 @@ public class LSMColumnBTreeUtil {
         ITreeIndexFrameFactory mergeLeafFrameFactory = new 
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
                 
columnManagerFactory.createMergeColumnTupleReaderWriterFactory());
         ITreeIndexFrameFactory bulkLoadLeafFrameFactory = new 
ColumnBTreeLeafFrameFactory(bulkLoadTupleWriterFactory,
-                columnManagerFactory.getLoadColumnTupleReaderWriterFactory());
+                
columnManagerFactory.getLoadColumnTupleReaderWriterFactory(cmpFactories));
         ITreeIndexFrameFactory insertLeafFrameFactory = new 
BTreeNSMLeafFrameFactory(insertTupleWriterFactory);
         ITreeIndexFrameFactory deleteLeafFrameFactory = new 
BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
         ITreeIndexFrameFactory interiorFrameFactory = new 
BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);

Reply via email to