This is an automated email from the ASF dual-hosted git repository.

xikui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f2abd  [NO ISSUE] Adding more extension APIs
44f2abd is described below

commit 44f2abde4998310c1ab61d3ddf4369f76fe70a3c
Author: Xikui Wang <[email protected]>
AuthorDate: Thu Aug 20 14:55:20 2020 -0700

    [NO ISSUE] Adding more extension APIs
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    1. Allow extensions to override LangExpressionToPlanTranslator to
    tranlsate insert/upsert/delete.
    2. Reuse the current statement executor in feed if possible to make sure
    extensions' translator can be used properly.
    3. Add new methods in MetadataProvider so that extensions could override
    them to use customized LSM runtime operators.
    4. Add new operational interface in LSM runtime to allow extensions to
    modify data before persistence.
    
    Change-Id: Iada557f15af9de6fbdb6f6d4e41c0266c1d3fbff
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7643
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Xikui Wang <[email protected]>
---
 .../translator/LangExpressionToPlanTranslator.java | 14 +++---
 .../asterix/app/active/FeedEventsListener.java     | 10 ++--
 .../asterix/app/translator/QueryTranslator.java    |  8 ++--
 .../org/apache/asterix/utils/FeedOperations.java   |  9 ++--
 .../asterix/test/active/ActiveStatsTest.java       |  2 +-
 .../test/active/DummyFeedEventsListener.java       |  6 ++-
 .../LSMInsertDeleteOperatorNodePushable.java       | 12 ++---
 .../LSMTreeInsertDeleteOperatorDescriptor.java     |  2 +-
 .../metadata/declared/MetadataProvider.java        | 33 +++++++++++--
 .../LSMPrimaryInsertOperatorDescriptor.java        |  8 ++--
 .../LSMPrimaryInsertOperatorNodePushable.java      | 15 ++++--
 .../LSMPrimaryUpsertOperatorDescriptor.java        | 14 +++---
 .../LSMPrimaryUpsertOperatorNodePushable.java      | 56 +++++++++++++---------
 13 files changed, 122 insertions(+), 67 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index eb4bfd4..4bb1adf 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -443,7 +443,7 @@ abstract class LangExpressionToPlanTranslator
         return plan;
     }
 
-    private ILogicalOperator translateDelete(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateDelete(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
             ICompiledDmlStatement stmt) throws AlgebricksException {
@@ -464,7 +464,7 @@ abstract class LangExpressionToPlanTranslator
         return leafOperator;
     }
 
-    private ILogicalOperator translateUpsert(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateUpsert(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, 
ILogicalOperator topOp,
@@ -579,7 +579,7 @@ abstract class LangExpressionToPlanTranslator
         return processReturningExpression(rootOperator, upsertOp, 
compiledUpsert, resultMetadata);
     }
 
-    private ILogicalOperator translateInsert(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateInsert(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws 
AlgebricksException {
@@ -609,7 +609,7 @@ abstract class LangExpressionToPlanTranslator
 
     // Stitches the translated operators for the returning expression into the 
query
     // plan.
-    private ILogicalOperator processReturningExpression(ILogicalOperator 
inputOperator,
+    protected ILogicalOperator processReturningExpression(ILogicalOperator 
inputOperator,
             InsertDeleteUpsertOperator insertOp, CompiledInsertStatement 
compiledInsert, IResultMetadata resultMetadata)
             throws AlgebricksException {
         Expression returnExpression = compiledInsert.getReturnExpression();
@@ -651,7 +651,7 @@ abstract class LangExpressionToPlanTranslator
         return distResultOperator;
     }
 
-    private DatasetDataSource validateDatasetInfo(MetadataProvider 
metadataProvider, DataverseName dataverseName,
+    protected DatasetDataSource validateDatasetInfo(MetadataProvider 
metadataProvider, DataverseName dataverseName,
             String datasetName, SourceLocation sourceLoc) throws 
AlgebricksException {
         Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
         if (dataset == null) {
@@ -671,7 +671,7 @@ abstract class LangExpressionToPlanTranslator
                 dataset.getDatasetDetails(), domain);
     }
 
-    private FileSplit getDefaultOutputFileLocation(ICcApplicationContext 
appCtx) throws AlgebricksException {
+    protected FileSplit getDefaultOutputFileLocation(ICcApplicationContext 
appCtx) throws AlgebricksException {
         String outputDir = System.getProperty("java.io.tmpDir");
         String filePath =
                 outputDir + System.getProperty("file.separator") + 
OUTPUT_FILE_PREFIX + outputFileID.incrementAndGet();
@@ -1760,7 +1760,7 @@ abstract class LangExpressionToPlanTranslator
      *            the query plan.
      * @throws CompilationException
      */
-    private void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) 
throws CompilationException {
+    protected void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) 
throws CompilationException {
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             Set<Mutable<ILogicalOperator>> opRefSet = new HashSet<>();
             eliminateSharedOperatorReference(opRef, opRefSet);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index d4a52fb..c4db4eb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -26,6 +26,7 @@ import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -50,14 +51,17 @@ public class FeedEventsListener extends 
ActiveEntityEventsListener {
 
     private final Feed feed;
     private final List<FeedConnection> feedConnections;
+    private final ILangExtension.Language translatorLang;
 
     public FeedEventsListener(IStatementExecutor statementExecutor, 
ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> 
datasets,
             AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, final List<FeedConnection> feedConnections) throws 
HyracksDataException {
+            Feed feed, final List<FeedConnection> feedConnections, 
ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, 
runtimeName, retryPolicyFactory);
         this.feed = feed;
         this.feedConnections = feedConnections;
+        this.translatorLang = translatorLang;
     }
 
     @Override
@@ -94,8 +98,8 @@ public class FeedEventsListener extends 
ActiveEntityEventsListener {
     @Override
     protected JobId compileAndStartJob(MetadataProvider mdProvider) throws 
HyracksDataException {
         try {
-            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> 
jobInfo =
-                    FeedOperations.buildStartFeedJob(mdProvider, feed, 
feedConnections, statementExecutor, hcc);
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> 
jobInfo = FeedOperations
+                    .buildStartFeedJob(mdProvider, feed, feedConnections, 
statementExecutor, hcc, translatorLang);
             JobSpecification feedJob = jobInfo.getLeft();
             
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 78a82d6..f8e0cee 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1986,7 +1986,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         declaredFunctions.add(fds);
     }
 
-    protected void handleCreateFunctionStatement(MetadataProvider 
metadataProvider, Statement stmt,
+    public void handleCreateFunctionStatement(MetadataProvider 
metadataProvider, Statement stmt,
             IStatementRewriter stmtRewriter) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         FunctionSignature signature = cfs.getFunctionSignature();
@@ -2985,7 +2985,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    private void handleStartFeedStatement(MetadataProvider metadataProvider, 
Statement stmt,
+    protected void handleStartFeedStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         SourceLocation sourceLoc = sfs.getSourceLocation();
@@ -3024,7 +3024,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 }
                 listener = new FeedEventsListener(this, 
metadataProvider.getApplicationContext(), hcc, entityId,
                         datasets, null, 
FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections);
+                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections, 
compilationProvider.getLanguage());
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             committed = true;
@@ -3039,7 +3039,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    private void handleStopFeedStatement(MetadataProvider metadataProvider, 
Statement stmt) throws Exception {
+    protected void handleStopFeedStatement(MetadataProvider metadataProvider, 
Statement stmt) throws Exception {
         StopFeedStatement sfst = (StopFeedStatement) stmt;
         SourceLocation sourceLoc = sfst.getSourceLocation();
         DataverseName dataverseName = 
getActiveDataverseName(sfst.getDataverseName());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 20a97ed..68234b7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -450,7 +451,8 @@ public class FeedOperations {
 
     public static Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
             MetadataProvider metadataProvider, Feed feed, List<FeedConnection> 
feedConnections,
-            IStatementExecutor statementExecutor, IHyracksClientConnection 
hcc) throws Exception {
+            IStatementExecutor statementExecutor, IHyracksClientConnection 
hcc, ILangExtension.Language translatorLang)
+            throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
         Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = 
buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
         List<JobSpecification> jobsList = new ArrayList<>();
@@ -465,8 +467,9 @@ public class FeedOperations {
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
                 StringUtils.join(ingestionLocations, ','));
         // TODO: Once we deprecated AQL, this extra queryTranslator can be 
removed.
-        IStatementExecutor translator =
-                getSQLPPTranslator(metadataProvider, ((QueryTranslator) 
statementExecutor).getSessionOutput());
+        IStatementExecutor translator = translatorLang == 
ILangExtension.Language.AQL
+                ? getSQLPPTranslator(metadataProvider, ((QueryTranslator) 
statementExecutor).getSessionOutput())
+                : statementExecutor;
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
             JobSpecification connectionJob =
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 963ba7c..b23b3cf 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -107,7 +107,7 @@ public class ActiveStatsTest {
         // Add event listener
         ActiveEntityEventsListener eventsListener = new 
DummyFeedEventsListener(statementExecutor, appCtx, null,
                 entityId, datasetList, partitionConstraint, 
FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList());
+                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList(), 
Language.SQLPP);
         // Register mock runtime
         NCAppRuntimeContext nc1AppCtx =
                 (NCAppRuntimeContext) 
ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 88f1332..884d209 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -26,6 +26,7 @@ import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
@@ -43,9 +44,10 @@ public class DummyFeedEventsListener extends 
FeedEventsListener {
     public DummyFeedEventsListener(IStatementExecutor statementExecutor, 
ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> 
datasets,
             AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, List<FeedConnection> feedConnections) throws 
HyracksDataException {
+            Feed feed, List<FeedConnection> feedConnections, 
ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, 
runtimeName, retryPolicyFactory, feed,
-                feedConnections);
+                feedConnections, translatorLang);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 5e4b5a1..2ad2f7a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -48,11 +48,11 @@ import 
org.apache.hyracks.storage.common.IIndexAccessParameters;
 public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     public static final String KEY_INDEX = "Index";
-    private final boolean isPrimary;
-    private final SourceLocation sourceLoc;
+    protected final boolean isPrimary;
+    protected final SourceLocation sourceLoc;
     // This class has both lsmIndex and index (in super class) pointing to the 
same object
     private AbstractLSMIndex lsmIndex;
-    private int i = 0;
+    protected int i = 0;
 
     /**
      * The following three variables are used to keep track of the information 
regarding flushing partial frame such as
@@ -64,9 +64,9 @@ public class LSMInsertDeleteOperatorNodePushable extends 
LSMIndexInsertUpdateDel
      * ==> captured in currentTupleIdx variable
      * These variables are reset for each frame, i.e., whenever nextFrame() is 
called, these variables are reset.
      */
-    private boolean flushedPartialTuples;
-    private int currentTupleIdx;
-    private int lastFlushedTupleIdx;
+    protected boolean flushedPartialTuples;
+    protected int currentTupleIdx;
+    protected int lastFlushedTupleIdx;
 
     public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int 
partition, int[] fieldPermutation,
             RecordDescriptor inputRecDesc, IndexOperation op, boolean 
isPrimary,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
index 45661e4..2226ca0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
@@ -33,7 +33,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdat
 public class LSMTreeInsertDeleteOperatorDescriptor extends 
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final boolean isPrimary;
+    protected final boolean isPrimary;
 
     public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, 
IIndexDataflowHelperFactory indexHelperFactory,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2933756..8a16014 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -131,12 +131,14 @@ import 
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFact
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -975,10 +977,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 fieldPermutation[i++] = idx;
             }
         }
-        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, 
recordDesc, fieldPermutation,
+        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, 
fieldPermutation,
                 context.getMissingWriterFactory());
     }
 
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
createPrimaryIndexUpsertOp(JobSpecification spec,
+            MetadataProvider metadataProvider, Dataset dataset, 
RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory 
missingWriterFactory) throws AlgebricksException {
+        // this can be used by extensions to pick up their own operators
+        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, 
inputRecordDesc, fieldPermutation,
+                missingWriterFactory);
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory 
adapterFactory) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1120,17 +1130,34 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     pkidfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
                             primaryKeySplitsAndConstraint.first);
                 }
-                op = new LSMPrimaryInsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                op = createLSMPrimaryInsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, 
numKeys, filterFields);
 
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
+                op = createLSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
                         null, true, modificationCallbackFactory);
             }
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
+    protected LSMPrimaryInsertOperatorDescriptor 
createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
+            RecordDescriptor inputRecordDesc, int[] fieldPermutation, 
IIndexDataflowHelperFactory idfh,
+            IIndexDataflowHelperFactory pkidfh, 
IModificationOperationCallbackFactory modificationCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int 
numKeys, int[] filterFields) {
+        // this can be used by extensions to pick up their own operators
+        return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, 
fieldPermutation, idfh, pkidfh,
+                modificationCallbackFactory, searchCallbackFactory, numKeys, 
filterFields);
+    }
+
+    protected LSMTreeInsertDeleteOperatorDescriptor 
createLSMTreeInsertDeleteOperatorDescriptor(
+            IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, 
int[] fieldPermutation, IndexOperation op,
+            IIndexDataflowHelperFactory indexHelperFactory, 
ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
+            IModificationOperationCallbackFactory modCallbackFactory) {
+        return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, 
fieldPermutation, op, indexHelperFactory,
+                tupleFilterFactory, isPrimary, modCallbackFactory);
+    }
+
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexInsertOrDeleteOrUpsertRuntime(
             IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> 
dataSourceIndex,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IVariableTypeEnvironment typeEnv,
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index 42b8f29..d639f3d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -34,10 +34,10 @@ public class LSMPrimaryInsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
 
     private static final long serialVersionUID = 1L;
 
-    private final IIndexDataflowHelperFactory keyIndexHelperFactory;
-    private final ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numOfPrimaryKeys;
-    private final int[] filterFields;
+    protected final IIndexDataflowHelperFactory keyIndexHelperFactory;
+    protected final ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numOfPrimaryKeys;
+    protected final int[] filterFields;
 
     public LSMPrimaryInsertOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
indexHelperFactory,
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 0bb42d4..a886161 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -105,7 +105,17 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             }
         }
         keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(sourceLoc);
+        frameOpCallback = 
NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, 
lsmAccessor);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(SourceLocation 
sourceLoc) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws 
HyracksDataException {
                 if (index < currentTupleIdx) {
@@ -127,6 +137,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                     cursor.close();
                 }
                 if (!duplicate) {
+                    beforeModification(tuple);
                     lsmAccessor.forceUpsert(tuple);
                     if (lsmAccessorForKeyIndex != null) {
                         lsmAccessorForKeyIndex.forceUpsert(keyTuple);
@@ -157,8 +168,6 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 // no op
             }
         };
-
-        frameOpCallback = 
NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, 
lsmAccessor);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 7587ca6..43f54f2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -36,13 +36,13 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFacto
 public class LSMPrimaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private final ARecordType recordType;
-    private final int filterIndex;
-    private ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numPrimaryKeys;
-    private final IMissingWriterFactory missingWriterFactory;
-    private final boolean hasSecondaries;
+    protected final IFrameOperationCallbackFactory frameOpCallbackFactory;
+    protected final ARecordType recordType;
+    protected final int filterIndex;
+    protected ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numPrimaryKeys;
+    protected final IMissingWriterFactory missingWriterFactory;
+    protected final boolean hasSecondaries;
 
     public LSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
indexHelperFactory,
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3e43e02..316665d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -83,17 +83,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ThreadLocal<DateFormat> DATE_FORMAT =
             ThreadLocal.withInitial(() -> new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
-    private final PermutingFrameTupleReference key;
+    protected final PermutingFrameTupleReference key;
     private MultiComparator keySearchCmp;
     private ArrayTupleBuilder missingTupleBuilder;
     private final IMissingWriter missingWriter;
-    private ArrayTupleBuilder tb;
+    protected ArrayTupleBuilder tb;
     private DataOutput dos;
-    private RangePredicate searchPred;
-    private IIndexCursor cursor;
-    private ITupleReference prevTuple;
-    private final int numOfPrimaryKeys;
-    boolean isFiltered = false;
+    protected RangePredicate searchPred;
+    protected IIndexCursor cursor;
+    protected ITupleReference prevTuple;
+    protected final int numOfPrimaryKeys;
+    protected boolean isFiltered = false;
     private final ArrayTupleReference prevTupleWithFilter = new 
ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
     private ARecordType recordType;
@@ -103,13 +103,13 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    private LockThenSearchOperationCallback searchCallback;
-    private IFrameOperationCallback frameOpCallback;
+    protected LockThenSearchOperationCallback searchCallback;
+    protected IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private AbstractIndexModificationOperationCallback abstractModCallback;
+    protected AbstractIndexModificationOperationCallback abstractModCallback;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor processor;
-    private LSMTreeIndexAccessor lsmAccessor;
+    protected LSMTreeIndexAccessor lsmAccessor;
     private final ITracer tracer;
     private final long traceCategory;
     private long lastRecordInTimeStamp = 0L;
@@ -144,7 +144,18 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             this.prevRecWithPKWithFilterValue = new 
ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(hasSecondaries);
+        tracer = ctx.getJobletContext().getServiceContext().getTracer();
+        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(final boolean 
hasSecondaries) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws 
HyracksDataException {
                 try {
@@ -176,6 +187,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         appendUpsertIndicator(!isDelete);
                         appendPreviousTupleAsMissing();
                     }
+                    beforeModification(tuple);
                     if (isDelete && prevTuple != null) {
                         // Only delete if it is a delete and not upsert
                         // And previous tuple with the same key was found
@@ -213,8 +225,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 frameOpCallback.fail(th);
             }
         };
-        tracer = ctx.getJobletContext().getServiceContext().getTracer();
-        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
     }
 
     // we have the permutation which has [pk locations, record location, 
optional:filter-location]
@@ -285,12 +295,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
-    private void resetSearchPredicate(int tupleIndex) {
+    protected void resetSearchPredicate(int tupleIndex) {
         key.reset(accessor, tupleIndex);
         searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    private void writeOutput(int tupleIndex, boolean recordWasInserted, 
boolean recordWasDeleted) throws IOException {
+    protected void writeOutput(int tupleIndex, boolean recordWasInserted, 
boolean recordWasDeleted) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -307,7 +317,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
-    private static boolean isDeleteOperation(ITupleReference t1, int field) {
+    protected static boolean isDeleteOperation(ITupleReference t1, int field) {
         return TypeTagUtil.isType(t1, field, 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }
 
@@ -326,7 +336,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
-    private void appendFilterToOutput() throws IOException {
+    protected void appendFilterToOutput() throws IOException {
         // if with filters, append the filter
         if (isFiltered) {
             dos.write(prevTuple.getFieldData(filterFieldIndex), 
prevTuple.getFieldStart(filterFieldIndex),
@@ -335,18 +345,18 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
-    private void appendUpsertIndicator(boolean isUpsert) throws IOException {
+    protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
         recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : 
ABoolean.FALSE, dos);
         tb.addFieldEndOffset();
     }
 
-    private void appendPrevRecord() throws IOException {
+    protected void appendPrevRecord() throws IOException {
         dos.write(prevTuple.getFieldData(numOfPrimaryKeys), 
prevTuple.getFieldStart(numOfPrimaryKeys),
                 prevTuple.getFieldLength(numOfPrimaryKeys));
         tb.addFieldEndOffset();
     }
 
-    private void appendPreviousMeta() throws IOException {
+    protected void appendPreviousMeta() throws IOException {
         // if has meta, then append meta
         if (hasMeta) {
             dos.write(prevTuple.getFieldData(metaFieldIndex), 
prevTuple.getFieldStart(metaFieldIndex),
@@ -355,7 +365,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         }
     }
 
-    private void appendPreviousTupleAsMissing() throws IOException {
+    protected void appendPreviousTupleAsMissing() throws IOException {
         prevTuple = null;
         writeMissingField();
         if (hasMeta) {
@@ -376,7 +386,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         appender.write(writer, true);
     }
 
-    private void appendFilterToPrevTuple() throws IOException {
+    protected void appendFilterToPrevTuple() throws IOException {
         if (isFiltered) {
             prevRecWithPKWithFilterValue.reset();
             for (int i = 0; i < prevTuple.getFieldCount(); i++) {

Reply via email to