This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 835bc42b37e780a52e25f00a1e431f82d9d774cd Merge: 746e3f5fea 7c7d24096e Author: Michael Blow <[email protected]> AuthorDate: Thu May 9 22:10:53 2024 -0400 Merge branch 'gerrit/trinity' into 'master' Change-Id: I4408569b9d202ef120fe042781710cf04af884a1 .../apache/asterix/translator/ClientRequest.java | 98 ++++++++++++------- .../apache/asterix/translator/Receptionist.java | 6 +- .../api/http/server/QueryServiceServlet.java | 2 +- .../app/active/ActiveNotificationHandler.java | 10 +- .../org/apache/asterix/app/cc/GlobalTxManager.java | 16 ++-- .../asterix/app/translator/QueryTranslator.java | 6 +- .../asterix/hyracks/bootstrap/CCApplication.java | 1 + .../http/servlet/QueryCancellationServletTest.java | 4 +- .../asterix/test/active/ActiveStatsTest.java | 6 +- .../test/active/TestClusterControllerActor.java | 7 +- .../apache/asterix/common/api/IClientRequest.java | 32 +++++++ .../apache/asterix/common/api/IReceptionist.java | 9 +- .../apache/asterix/common/api/IRequestTracker.java | 13 ++- .../runtime/job/listener/NodeJobTracker.java | 9 +- .../asterix/runtime/utils/RequestTracker.java | 52 ++++++++++ .../runtime/job/listener/NodeJobTrackerTest.java | 7 +- .../hyracks/api/job/IJobLifecycleListener.java | 31 +++--- .../apache/hyracks/api/job/JobSpecification.java | 10 ++ .../control/cc/application/CCServiceContext.java | 16 ++-- .../hyracks/control/cc/executor/JobExecutor.java | 2 +- .../apache/hyracks/control/cc/job/IJobManager.java | 20 ++++ .../apache/hyracks/control/cc/job/JobManager.java | 105 +++++++++++++++++---- .../control/cc/result/ResultDirectoryService.java | 9 +- .../integration/TestJobLifecycleListener.java | 9 +- 24 files changed, 369 insertions(+), 111 deletions(-) diff --cc asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java index 6ded860157,84bee6a6de..524af430e6 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java @@@ -27,9 -27,7 +27,8 @@@ import org.apache.asterix.common.api.IR import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.api.ISchedulableClientRequest; import org.apache.asterix.common.api.RequestReference; - import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.http.HttpHeaders; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.util.NetworkUtil; diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index 9d1e108b25,0000000000..c85e3b0ef4 mode 100644,000000..100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@@ -1,242 -1,0 +1,246 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.cc; + +import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.app.message.AtomicJobCommitMessage; +import org.apache.asterix.app.message.AtomicJobRollbackMessage; +import org.apache.asterix.app.message.EnableMergeMessage; +import org.apache.asterix.common.cluster.IGlobalTxManager; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.transactions.IGlobalTransactionContext; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext; +import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; ++import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.nc.io.IOManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class GlobalTxManager implements IGlobalTxManager { + + private static final Logger LOGGER = LogManager.getLogger(); + private final Map<JobId, IGlobalTransactionContext> txnContextRepository = new ConcurrentHashMap<>(); + private final ICCServiceContext serviceContext; + private final IOManager ioManager; + public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty"; + + public GlobalTxManager(ICCServiceContext serviceContext, IOManager ioManager) { + this.serviceContext = serviceContext; + this.ioManager = ioManager; + } + + @Override + public IGlobalTransactionContext beginTransaction(JobId jobId, int numParticipatingNodes, + int numParticipatingPartitions, List<Integer> participatingDatasetIds) throws ACIDException { + GlobalTransactionContext context = new GlobalTransactionContext(jobId, participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions); + txnContextRepository.put(jobId, context); + return context; + } + + @Override + public void commitTransaction(JobId jobId) throws ACIDException { + IGlobalTransactionContext context = getTransactionContext(jobId); + if (context.getAcksReceived() != context.getNumPartitions()) { + synchronized (context) { + try { + context.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ACIDException(e); + } + } + } + context.setTxnStatus(TransactionStatus.PREPARED); + context.persist(ioManager); + context.resetAcksReceived(); + sendJobCommitMessages(context); + + synchronized (context) { + try { + CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig(); + context.wait(config.getGlobalTxCommitTimeout()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ACIDException(e); + } + } + txnContextRepository.remove(jobId); + } + + @Override + public IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException { + IGlobalTransactionContext context = txnContextRepository.get(jobId); + if (context == null) { + throw new ACIDException("Transaction for jobId " + jobId + " does not exist"); + } + return context; + } + + @Override + public void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap) { + IGlobalTransactionContext context = txnContextRepository.get(jobId); + if (context == null) { + LOGGER.warn("JobPreparedMessage received for jobId " + jobId + + ", which does not exist. The transaction for the job is already aborted"); + return; + } + if (context.getNodeResourceMap().containsKey(nodeId)) { + context.getNodeResourceMap().get(nodeId).putAll(componentIdMap); + } else { + context.getNodeResourceMap().put(nodeId, componentIdMap); + } + if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) { + synchronized (context) { + context.notifyAll(); + } + } + } + + private void sendJobCommitMessages(IGlobalTransactionContext context) { + for (String nodeId : context.getNodeResourceMap().keySet()) { + AtomicJobCommitMessage message = new AtomicJobCommitMessage(context.getJobId(), context.getDatasetIds()); + try { + ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message, + nodeId); + } catch (Exception e) { + throw new ACIDException(e); + } + } + } + + @Override + public void handleJobCompletionMessage(JobId jobId, String nodeId) { + IGlobalTransactionContext context = getTransactionContext(jobId); + if (context.incrementAndGetAcksReceived() == context.getNumNodes()) { + context.delete(ioManager); + context.setTxnStatus(TransactionStatus.COMMITTED); + synchronized (context) { + context.notifyAll(); + } + sendEnableMergeMessages(context); + } + } + + @Override + public void handleJobRollbackCompletionMessage(JobId jobId, String nodeId) { + IGlobalTransactionContext context = getTransactionContext(jobId); + if (context.incrementAndGetAcksReceived() == context.getNumNodes()) { + context.setTxnStatus(TransactionStatus.ROLLBACK); + context.delete(ioManager); + synchronized (context) { + context.notifyAll(); + } + } + } + + private void sendEnableMergeMessages(IGlobalTransactionContext context) { + for (String nodeId : context.getNodeResourceMap().keySet()) { + for (Integer datasetId : context.getDatasetIds()) { + EnableMergeMessage message = new EnableMergeMessage(context.getJobId(), datasetId); + try { + ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(message, + nodeId); + } catch (Exception e) { + throw new ACIDException(e); + } + } + } + } + + @Override + public void rollback() throws Exception { + Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME)); + for (FileReference txnLogFileRef : txnLogFileRefs) { + IGlobalTransactionContext context = new GlobalTransactionContext(txnLogFileRef, ioManager); + txnContextRepository.put(context.getJobId(), context); + sendJobRollbackMessages(context); + } + } + + private void sendJobRollbackMessages(IGlobalTransactionContext context) throws Exception { + JobId jobId = context.getJobId(); + for (String nodeId : context.getNodeResourceMap().keySet()) { + AtomicJobRollbackMessage rollbackMessage = new AtomicJobRollbackMessage(jobId, context.getDatasetIds(), + context.getNodeResourceMap().get(nodeId)); + ((ICCMessageBroker) serviceContext.getMessageBroker()).sendRealTimeApplicationMessageToNC(rollbackMessage, + nodeId); + } + synchronized (context) { + try { + CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig(); + context.wait(config.getGlobalTxRollbackTimeout()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId); + ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT); + } + } + txnContextRepository.remove(jobId); + } + + @Override + public void abortTransaction(JobId jobId) throws Exception { + IGlobalTransactionContext context = getTransactionContext(jobId); + context.resetAcksReceived(); + if (context.getTxnStatus() == TransactionStatus.PREPARED) { + sendJobRollbackMessages(context); + } + txnContextRepository.remove(jobId); + } + + @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { ++ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) ++ throws HyracksException { ++ ++ } ++ ++ @Override ++ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { + GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME); + if (globalTxInfo != null) { + beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(), + globalTxInfo.getDatasetIds()); + } + } + + @Override - public void notifyJobStart(JobId jobId) throws HyracksException { ++ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) ++ throws HyracksException { + + } - - @Override - public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { - } +} diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 3035496e50,c89097b474..2ef36571c2 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@@ -5409,24 -4869,8 +5412,25 @@@ public class QueryTranslator extends Ab appCtx.getReceptionist().ensureSchedulable(schedulableRequest); // ensure request not cancelled before running job ensureNotCancelled(clientRequest); + jobSpec.setRequestId(clientRequest.getId()); - final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + if (atomicStatement != null) { + Dataset ds = metadataProvider.findDataset(((InsertStatement) atomicStatement).getDatabaseName(), + ((InsertStatement) atomicStatement).getDataverseName(), + ((InsertStatement) atomicStatement).getDatasetName()); + atomic = ds.isAtomic(); + if (atomic) { + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); + List<Integer> participatingDatasetIds = new ArrayList<>(); + participatingDatasetIds.add(ds.getDatasetId()); + jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( + participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + } + } + jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); if (LOGGER.isInfoEnabled()) { LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId, requestParameters.getRequestReference().getUuid(), requestParameters.getClientContextId()); diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java index 57bff8f3d5,153eea9a69..04adce99a4 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java @@@ -18,8 -18,6 +18,7 @@@ */ package org.apache.asterix.common.api; - import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java index 56d3eeaca5,02f20f8e2f..d8a8b1aef9 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java @@@ -48,8 -45,9 +49,9 @@@ public class NodeJobTracker implements private final Map<String, Set<JobId>> nodeJobs = new HashMap<>(); @Override - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec, + IJobCapacityController.JobSubmissionStatus status) { - getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); + getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); } @Override diff --cc hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java index 008be29584,4d4635aadc..19fdcfee01 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java @@@ -47,7 -48,7 +48,8 @@@ public class TestJobLifecycleListener i private final Set<JobId> finishWithoutStart = new HashSet<>(); @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { - public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) throws HyracksException { ++ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) ++ throws HyracksException { if (created.containsKey(jobId)) { LOGGER.log(Level.WARN, "Job " + jobId + "has been created before"); increment(doubleCreated, jobId); @@@ -75,7 -76,7 +77,8 @@@ } @Override - public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { - public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { ++ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) ++ throws HyracksException { if (!started.contains(jobId)) { LOGGER.log(Level.WARN, "Job " + jobId + "has not been started"); finishWithoutStart.add(jobId);
