This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 835bc42b37e780a52e25f00a1e431f82d9d774cd
Merge: 746e3f5fea 7c7d24096e
Author: Michael Blow <[email protected]>
AuthorDate: Thu May 9 22:10:53 2024 -0400

    Merge branch 'gerrit/trinity' into 'master'
    
    Change-Id: I4408569b9d202ef120fe042781710cf04af884a1

 .../apache/asterix/translator/ClientRequest.java   |  98 ++++++++++++-------
 .../apache/asterix/translator/Receptionist.java    |   6 +-
 .../api/http/server/QueryServiceServlet.java       |   2 +-
 .../app/active/ActiveNotificationHandler.java      |  10 +-
 .../org/apache/asterix/app/cc/GlobalTxManager.java |  16 ++--
 .../asterix/app/translator/QueryTranslator.java    |   6 +-
 .../asterix/hyracks/bootstrap/CCApplication.java   |   1 +
 .../http/servlet/QueryCancellationServletTest.java |   4 +-
 .../asterix/test/active/ActiveStatsTest.java       |   6 +-
 .../test/active/TestClusterControllerActor.java    |   7 +-
 .../apache/asterix/common/api/IClientRequest.java  |  32 +++++++
 .../apache/asterix/common/api/IReceptionist.java   |   9 +-
 .../apache/asterix/common/api/IRequestTracker.java |  13 ++-
 .../runtime/job/listener/NodeJobTracker.java       |   9 +-
 .../asterix/runtime/utils/RequestTracker.java      |  52 ++++++++++
 .../runtime/job/listener/NodeJobTrackerTest.java   |   7 +-
 .../hyracks/api/job/IJobLifecycleListener.java     |  31 +++---
 .../apache/hyracks/api/job/JobSpecification.java   |  10 ++
 .../control/cc/application/CCServiceContext.java   |  16 ++--
 .../hyracks/control/cc/executor/JobExecutor.java   |   2 +-
 .../apache/hyracks/control/cc/job/IJobManager.java |  20 ++++
 .../apache/hyracks/control/cc/job/JobManager.java  | 105 +++++++++++++++++----
 .../control/cc/result/ResultDirectoryService.java  |   9 +-
 .../integration/TestJobLifecycleListener.java      |   9 +-
 24 files changed, 369 insertions(+), 111 deletions(-)

diff --cc 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 6ded860157,84bee6a6de..524af430e6
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@@ -27,9 -27,7 +27,8 @@@ import org.apache.asterix.common.api.IR
  import org.apache.asterix.common.api.IRequestReference;
  import org.apache.asterix.common.api.ISchedulableClientRequest;
  import org.apache.asterix.common.api.RequestReference;
- import org.apache.asterix.common.dataflow.ICcApplicationContext;
  import org.apache.http.HttpHeaders;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
  import org.apache.hyracks.api.exceptions.HyracksDataException;
  import org.apache.hyracks.http.api.IServletRequest;
  import org.apache.hyracks.util.NetworkUtil;
diff --cc 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 9d1e108b25,0000000000..c85e3b0ef4
mode 100644,000000..100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@@ -1,242 -1,0 +1,246 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.app.cc;
 +
 +import static 
org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
 +
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +import org.apache.asterix.app.message.AtomicJobCommitMessage;
 +import org.apache.asterix.app.message.AtomicJobRollbackMessage;
 +import org.apache.asterix.app.message.EnableMergeMessage;
 +import org.apache.asterix.common.cluster.IGlobalTxManager;
 +import org.apache.asterix.common.exceptions.ACIDException;
 +import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 +import org.apache.asterix.common.transactions.IGlobalTransactionContext;
 +import org.apache.asterix.common.utils.StorageConstants;
 +import 
org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext;
 +import 
org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo;
 +import org.apache.hyracks.api.application.ICCServiceContext;
 +import org.apache.hyracks.api.exceptions.HyracksException;
 +import org.apache.hyracks.api.io.FileReference;
 +import org.apache.hyracks.api.job.JobId;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.api.job.JobStatus;
++import org.apache.hyracks.api.job.resource.IJobCapacityController;
 +import org.apache.hyracks.control.cc.ClusterControllerService;
 +import org.apache.hyracks.control.common.controllers.CCConfig;
 +import org.apache.hyracks.control.nc.io.IOManager;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 +import org.apache.hyracks.util.ExitUtil;
 +import org.apache.logging.log4j.LogManager;
 +import org.apache.logging.log4j.Logger;
 +
 +public class GlobalTxManager implements IGlobalTxManager {
 +
 +    private static final Logger LOGGER = LogManager.getLogger();
 +    private final Map<JobId, IGlobalTransactionContext> txnContextRepository 
= new ConcurrentHashMap<>();
 +    private final ICCServiceContext serviceContext;
 +    private final IOManager ioManager;
 +    public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty";
 +
 +    public GlobalTxManager(ICCServiceContext serviceContext, IOManager 
ioManager) {
 +        this.serviceContext = serviceContext;
 +        this.ioManager = ioManager;
 +    }
 +
 +    @Override
 +    public IGlobalTransactionContext beginTransaction(JobId jobId, int 
numParticipatingNodes,
 +            int numParticipatingPartitions, List<Integer> 
participatingDatasetIds) throws ACIDException {
 +        GlobalTransactionContext context = new 
GlobalTransactionContext(jobId, participatingDatasetIds,
 +                numParticipatingNodes, numParticipatingPartitions);
 +        txnContextRepository.put(jobId, context);
 +        return context;
 +    }
 +
 +    @Override
 +    public void commitTransaction(JobId jobId) throws ACIDException {
 +        IGlobalTransactionContext context = getTransactionContext(jobId);
 +        if (context.getAcksReceived() != context.getNumPartitions()) {
 +            synchronized (context) {
 +                try {
 +                    context.wait();
 +                } catch (InterruptedException e) {
 +                    Thread.currentThread().interrupt();
 +                    throw new ACIDException(e);
 +                }
 +            }
 +        }
 +        context.setTxnStatus(TransactionStatus.PREPARED);
 +        context.persist(ioManager);
 +        context.resetAcksReceived();
 +        sendJobCommitMessages(context);
 +
 +        synchronized (context) {
 +            try {
 +                CCConfig config = ((ClusterControllerService) 
serviceContext.getControllerService()).getCCConfig();
 +                context.wait(config.getGlobalTxCommitTimeout());
 +            } catch (InterruptedException e) {
 +                Thread.currentThread().interrupt();
 +                throw new ACIDException(e);
 +            }
 +        }
 +        txnContextRepository.remove(jobId);
 +    }
 +
 +    @Override
 +    public IGlobalTransactionContext getTransactionContext(JobId jobId) 
throws ACIDException {
 +        IGlobalTransactionContext context = txnContextRepository.get(jobId);
 +        if (context == null) {
 +            throw new ACIDException("Transaction for jobId " + jobId + " does 
not exist");
 +        }
 +        return context;
 +    }
 +
 +    @Override
 +    public void handleJobPreparedMessage(JobId jobId, String nodeId, 
Map<String, ILSMComponentId> componentIdMap) {
 +        IGlobalTransactionContext context = txnContextRepository.get(jobId);
 +        if (context == null) {
 +            LOGGER.warn("JobPreparedMessage received for jobId " + jobId
 +                    + ", which does not exist. The transaction for the job is 
already aborted");
 +            return;
 +        }
 +        if (context.getNodeResourceMap().containsKey(nodeId)) {
 +            context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
 +        } else {
 +            context.getNodeResourceMap().put(nodeId, componentIdMap);
 +        }
 +        if (context.incrementAndGetAcksReceived() == 
context.getNumPartitions()) {
 +            synchronized (context) {
 +                context.notifyAll();
 +            }
 +        }
 +    }
 +
 +    private void sendJobCommitMessages(IGlobalTransactionContext context) {
 +        for (String nodeId : context.getNodeResourceMap().keySet()) {
 +            AtomicJobCommitMessage message = new 
AtomicJobCommitMessage(context.getJobId(), context.getDatasetIds());
 +            try {
 +                ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
 +                        nodeId);
 +            } catch (Exception e) {
 +                throw new ACIDException(e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void handleJobCompletionMessage(JobId jobId, String nodeId) {
 +        IGlobalTransactionContext context = getTransactionContext(jobId);
 +        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
 +            context.delete(ioManager);
 +            context.setTxnStatus(TransactionStatus.COMMITTED);
 +            synchronized (context) {
 +                context.notifyAll();
 +            }
 +            sendEnableMergeMessages(context);
 +        }
 +    }
 +
 +    @Override
 +    public void handleJobRollbackCompletionMessage(JobId jobId, String 
nodeId) {
 +        IGlobalTransactionContext context = getTransactionContext(jobId);
 +        if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
 +            context.setTxnStatus(TransactionStatus.ROLLBACK);
 +            context.delete(ioManager);
 +            synchronized (context) {
 +                context.notifyAll();
 +            }
 +        }
 +    }
 +
 +    private void sendEnableMergeMessages(IGlobalTransactionContext context) {
 +        for (String nodeId : context.getNodeResourceMap().keySet()) {
 +            for (Integer datasetId : context.getDatasetIds()) {
 +                EnableMergeMessage message = new 
EnableMergeMessage(context.getJobId(), datasetId);
 +                try {
 +                    ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message,
 +                            nodeId);
 +                } catch (Exception e) {
 +                    throw new ACIDException(e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void rollback() throws Exception {
 +        Set<FileReference> txnLogFileRefs = 
ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
 +        for (FileReference txnLogFileRef : txnLogFileRefs) {
 +            IGlobalTransactionContext context = new 
GlobalTransactionContext(txnLogFileRef, ioManager);
 +            txnContextRepository.put(context.getJobId(), context);
 +            sendJobRollbackMessages(context);
 +        }
 +    }
 +
 +    private void sendJobRollbackMessages(IGlobalTransactionContext context) 
throws Exception {
 +        JobId jobId = context.getJobId();
 +        for (String nodeId : context.getNodeResourceMap().keySet()) {
 +            AtomicJobRollbackMessage rollbackMessage = new 
AtomicJobRollbackMessage(jobId, context.getDatasetIds(),
 +                    context.getNodeResourceMap().get(nodeId));
 +            ((ICCMessageBroker) 
serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(rollbackMessage,
 +                    nodeId);
 +        }
 +        synchronized (context) {
 +            try {
 +                CCConfig config = ((ClusterControllerService) 
serviceContext.getControllerService()).getCCConfig();
 +                context.wait(config.getGlobalTxRollbackTimeout());
 +            } catch (InterruptedException e) {
 +                Thread.currentThread().interrupt();
 +                LOGGER.error("Error while rolling back atomic statement for 
{}, halting JVM", jobId);
 +                ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
 +            }
 +        }
 +        txnContextRepository.remove(jobId);
 +    }
 +
 +    @Override
 +    public void abortTransaction(JobId jobId) throws Exception {
 +        IGlobalTransactionContext context = getTransactionContext(jobId);
 +        context.resetAcksReceived();
 +        if (context.getTxnStatus() == TransactionStatus.PREPARED) {
 +            sendJobRollbackMessages(context);
 +        }
 +        txnContextRepository.remove(jobId);
 +    }
 +
 +    @Override
-     public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException {
++    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
++            throws HyracksException {
++
++    }
++
++    @Override
++    public void notifyJobStart(JobId jobId, JobSpecification spec) throws 
HyracksException {
 +        GlobalTxInfo globalTxInfo = (GlobalTxInfo) 
spec.getProperty(GlOBAL_TX_PROPERTY_NAME);
 +        if (globalTxInfo != null) {
 +            beginTransaction(jobId, globalTxInfo.getNumNodes(), 
globalTxInfo.getNumPartitions(),
 +                    globalTxInfo.getDatasetIds());
 +        }
 +    }
 +
 +    @Override
-     public void notifyJobStart(JobId jobId) throws HyracksException {
++    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
++            throws HyracksException {
 +
 +    }
- 
-     @Override
-     public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
-     }
 +}
diff --cc 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3035496e50,c89097b474..2ef36571c2
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@@ -5409,24 -4869,8 +5412,25 @@@ public class QueryTranslator extends Ab
              appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
              // ensure request not cancelled before running job
              ensureNotCancelled(clientRequest);
+             jobSpec.setRequestId(clientRequest.getId());
 -            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, 
false);
 +            if (atomicStatement != null) {
 +                Dataset ds = metadataProvider.findDataset(((InsertStatement) 
atomicStatement).getDatabaseName(),
 +                        ((InsertStatement) 
atomicStatement).getDataverseName(),
 +                        ((InsertStatement) atomicStatement).getDatasetName());
 +                atomic = ds.isAtomic();
 +                if (atomic) {
 +                    int numParticipatingNodes = appCtx.getNodeJobTracker()
 +                            .getJobParticipatingNodes(jobSpec, 
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
 +                            .size();
 +                    int numParticipatingPartitions = 
appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
 +                            
LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
 +                    List<Integer> participatingDatasetIds = new ArrayList<>();
 +                    participatingDatasetIds.add(ds.getDatasetId());
 +                    
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
 +                            participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
 +                }
 +            }
 +            jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
              if (LOGGER.isInfoEnabled()) {
                  LOGGER.info("Created job {} for query uuid:{}, 
clientContextID:{}", jobId,
                          requestParameters.getRequestReference().getUuid(), 
requestParameters.getClientContextId());
diff --cc 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 57bff8f3d5,153eea9a69..04adce99a4
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@@ -18,8 -18,6 +18,7 @@@
   */
  package org.apache.asterix.common.api;
  
- import org.apache.asterix.common.dataflow.ICcApplicationContext;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
  import org.apache.hyracks.api.exceptions.HyracksDataException;
  import org.apache.hyracks.http.api.IServletRequest;
  
diff --cc 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 56d3eeaca5,02f20f8e2f..d8a8b1aef9
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@@ -48,8 -45,9 +49,9 @@@ public class NodeJobTracker implements 
      private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
  
      @Override
-     public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) {
+     public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec,
+             IJobCapacityController.JobSubmissionStatus status) {
 -        
getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> 
jobsSet.add(jobId));
 +        getJobParticipatingNodes(spec, 
null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
      }
  
      @Override
diff --cc 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29584,4d4635aadc..19fdcfee01
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@@ -47,7 -48,7 +48,8 @@@ public class TestJobLifecycleListener i
      private final Set<JobId> finishWithoutStart = new HashSet<>();
  
      @Override
-     public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException {
 -    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
++    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
++            throws HyracksException {
          if (created.containsKey(jobId)) {
              LOGGER.log(Level.WARN, "Job " + jobId + "has been created 
before");
              increment(doubleCreated, jobId);
@@@ -75,7 -76,7 +77,8 @@@
      }
  
      @Override
-     public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
 -    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions) throws HyracksException {
++    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
++            throws HyracksException {
          if (!started.contains(jobId)) {
              LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
              finishWithoutStart.add(jobId);

Reply via email to