Repository: asterixdb Updated Branches: refs/heads/master 2d059fd1c -> 9d0e21cea
[ASTERIXDB-1706][RT] Use System.nanoTime For Result Timestamp - user model changes: no - storage format changes: no - interface changes: yes Details: - Use System.nanoTime for result timestamp to avoid results being incorrectly swept due to System.currentTimeMillis system time adjustments. - Move sweep logic to AbstractDatasetManager. Change-Id: I388d2a477bcfdc47d11dc6a4873483b82c9fadbf Reviewed-on: https://asterix-gerrit.ics.uci.edu/2315 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9d0e21ce Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9d0e21ce Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9d0e21ce Branch: refs/heads/master Commit: 9d0e21cea4d431933bbd9ba3819b9802a1e4c0fc Parents: 2d059fd Author: Murtadha Hubail <[email protected]> Authored: Wed Jan 24 02:52:33 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Tue Jan 23 18:07:58 2018 -0800 ---------------------------------------------------------------------- .../hyracks/api/dataset/DatasetJobRecord.java | 2 +- .../hyracks/api/dataset/IDatasetManager.java | 14 +++-- .../cc/dataset/DatasetDirectoryService.java | 18 +++---- .../common/dataset/AbstractDatasetManager.java | 55 ++++++++++++++++++++ .../common/dataset/ResultStateSweeper.java | 44 +++------------- .../nc/dataset/DatasetPartitionManager.java | 18 ++----- .../control/nc/dataset/ResultSetMap.java | 2 +- 7 files changed, 83 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java index 55f1d7c..4e7ddda 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java @@ -84,7 +84,7 @@ public class DatasetJobRecord implements IDatasetStateRecord { private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>(); public DatasetJobRecord() { - this.timestamp = System.currentTimeMillis(); + this.timestamp = System.nanoTime(); this.status = new Status(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java index c8463d3..a0c1f78 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java @@ -24,11 +24,15 @@ import org.apache.hyracks.api.job.JobId; public interface IDatasetManager { - public Set<JobId> getJobIds(); + Set<JobId> getJobIds(); - public IDatasetStateRecord getState(JobId jobId); + IDatasetStateRecord getState(JobId jobId); - public void deinitState(JobId jobId); + void sweep(JobId jobId); - public long getResultTimestamp(JobId jobId); -} + /** + * Removes all references and deletes persisted files for + * all expired datasets. + */ + void sweepExpiredDatasets(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index a57baf5..04aaddd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -40,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.common.dataset.AbstractDatasetManager; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.logging.log4j.Level; @@ -53,25 +54,23 @@ import org.apache.logging.log4j.Logger; * the job (after it receives all the results) completely. Then we can just get rid of the location information for that * job. */ -public class DatasetDirectoryService implements IDatasetDirectoryService { +public class DatasetDirectoryService extends AbstractDatasetManager implements IDatasetDirectoryService { private static final Logger LOGGER = LogManager.getLogger(); - private final long resultTTL; - private final long resultSweepThreshold; private final Map<JobId, JobResultInfo> jobResultLocations; public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) { - this.resultTTL = resultTTL; + super(resultTTL); this.resultSweepThreshold = resultSweepThreshold; - jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>(); + jobResultLocations = new LinkedHashMap<>(); } @Override public void init(ExecutorService executor) { - executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER)); + executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER)); } @Override @@ -181,12 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { } @Override - public synchronized long getResultTimestamp(JobId jobId) { - return getState(jobId).getTimestamp(); - } - - @Override - public synchronized void deinitState(JobId jobId) { + public synchronized void sweep(JobId jobId) { jobResultLocations.remove(jobId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java new file mode 100644 index 0000000..f95229e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.dataset; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hyracks.api.dataset.IDatasetManager; +import org.apache.hyracks.api.dataset.IDatasetStateRecord; +import org.apache.hyracks.api.job.JobId; + +public abstract class AbstractDatasetManager implements IDatasetManager { + + private final long nanoResultTTL; + + protected AbstractDatasetManager(long resultTTL) { + this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL); + } + + @Override + public synchronized void sweepExpiredDatasets() { + final List<JobId> expiredDatasets = new ArrayList<>(); + final long sweepTime = System.nanoTime(); + for (JobId jobId : getJobIds()) { + final IDatasetStateRecord state = getState(jobId); + if (state != null && hasExpired(state, sweepTime, nanoResultTTL)) { + expiredDatasets.add(jobId); + } + } + for (JobId jobId : expiredDatasets) { + sweep(jobId); + } + } + + private static boolean hasExpired(IDatasetStateRecord dataset, long currentTime, long ttl) { + return currentTime - dataset.getTimestamp() - ttl > 0; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java index a9ca771..901ec67 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java @@ -19,12 +19,7 @@ package org.apache.hyracks.control.common.dataset; -import java.util.ArrayList; -import java.util.List; - import org.apache.hyracks.api.dataset.IDatasetManager; -import org.apache.hyracks.api.job.JobId; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; /** @@ -33,53 +28,26 @@ import org.apache.logging.log4j.Logger; public class ResultStateSweeper implements Runnable { private final IDatasetManager datasetManager; - - private final long resultTTL; - private final long resultSweepThreshold; - private final Logger logger; - private final List<JobId> toBeCollected; - - public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold, - Logger logger) { + public ResultStateSweeper(IDatasetManager datasetManager, long resultSweepThreshold, Logger logger) { this.datasetManager = datasetManager; - this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; this.logger = logger; - toBeCollected = new ArrayList<JobId>(); } @Override - @SuppressWarnings("squid:S2142") // catch interrupted exception public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(resultSweepThreshold); - sweep(); + datasetManager.sweepExpiredDatasets(); + logger.trace("Result state cleanup instance successfully completed."); } catch (InterruptedException e) { - logger.log(Level.WARN, "Result cleaner thread interrupted, shutting down."); - break; // the interrupt was explicit from another thread. This thread should shut down... + logger.warn("Result cleaner thread interrupted, shutting down."); + Thread.currentThread().interrupt(); } } } - - private void sweep() { - synchronized (datasetManager) { - toBeCollected.clear(); - for (JobId jobId : datasetManager.getJobIds()) { - final long timestamp = datasetManager.getResultTimestamp(jobId); - if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) { - toBeCollected.add(jobId); - } - } - for (JobId jobId : toBeCollected) { - datasetManager.deinitState(jobId); - } - } - if (logger.isTraceEnabled()) { - logger.trace("Result state cleanup instance successfully completed."); - } - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index d381a67..476aeae 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -26,11 +26,11 @@ import java.util.concurrent.Executor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; -import org.apache.hyracks.api.dataset.IDatasetStateRecord; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.common.dataset.AbstractDatasetManager; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; @@ -38,7 +38,7 @@ import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class DatasetPartitionManager implements IDatasetPartitionManager { +public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager { private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; @@ -55,6 +55,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL, long resultSweepThreshold) { + super(resultTTL); this.ncs = ncs; this.executor = executor; deallocatableRegistry = new DefaultDeallocatableRegistry(); @@ -65,7 +66,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { datasetMemoryManager = null; } partitionResultStateMap = new LinkedHashMap<>(); - executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER)); + executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER)); } @Override @@ -179,16 +180,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager { } @Override - public synchronized long getResultTimestamp(JobId jobId) { - IDatasetStateRecord r = getState(jobId); - if (r == null) { - return -1; - } - return r.getTimestamp(); - } - - @Override - public synchronized void deinitState(JobId jobId) { + public synchronized void sweep(JobId jobId) { deinit(jobId); partitionResultStateMap.remove(jobId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d0e21ce/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java index 3957401..1a64a5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java @@ -37,7 +37,7 @@ class ResultSetMap implements IDatasetStateRecord, Serializable { private final HashMap<ResultSetId, ResultState[]> resultStateMap; ResultSetMap() { - timestamp = System.currentTimeMillis(); + timestamp = System.nanoTime(); resultStateMap = new HashMap<>(); }
