Repository: asterixdb Updated Branches: refs/heads/master 8b514a263 -> ff016514c
[ASTERIXDB-2506][STO] Checkpoint Datasets - user model changes: no - storage format changes: no - interface changes: yes Details: - Add new config to specify dataset checkpoint interval and default its value to 10 minutes. - Flush datasets which were not flushed for the dataset checkpoint interval. - Run dataset checkpoint logic as part of the CheckpointThread. - Improve dataset async flush API to accept any LSM index predicate. - Cleanup exception handling in CheckpoinThread. - Add test case for dataset checkpoint. Change-Id: I38a73a43a4b1b7d3a8ac79dd579ed4ef8c9c6a9b Reviewed-on: https://asterix-gerrit.ics.uci.edu/3106 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ff016514 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ff016514 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ff016514 Branch: refs/heads/master Commit: ff016514cca70c50085f07cb19400dd70e8ff01f Parents: 8b514a2 Author: Murtadha Hubail <[email protected]> Authored: Sun Jan 6 22:26:14 2019 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Thu Jan 10 15:57:49 2019 -0800 ---------------------------------------------------------------------- .../test/runtime/StorageExecutionTest.java | 64 ++++++++++++++++++++ .../src/test/resources/cc-storage.conf | 57 +++++++++++++++++ .../runtimets/only_testsuite_storage.xml | 20 ++++++ .../dataset-checkpoint.1.ddl.sqlpp | 25 ++++++++ .../dataset-checkpoint.2.update.sqlpp | 20 ++++++ .../dataset-checkpoint.3.pollquery.sqlpp | 24 ++++++++ .../dataset-checkpoint.4.ddl.sqlpp | 19 ++++++ .../cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../dataset-checkpoint/dataset-checkpoint.3.adm | 1 + .../resources/runtimets/testsuite_storage.xml | 27 +++++++++ .../common/api/IDatasetLifecycleManager.java | 8 ++- .../common/config/TransactionProperties.java | 9 +++ .../common/context/DatasetLifecycleManager.java | 37 ++++++----- .../context/PrimaryIndexOperationTracker.java | 13 ++++ .../transactions/CheckpointProperties.java | 9 ++- .../common/transactions/ICheckpointManager.java | 7 +++ .../service/recovery/CheckpointManager.java | 43 ++++++++++--- .../service/recovery/CheckpointThread.java | 37 +++++------ 20 files changed, 372 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java new file mode 100644 index 0000000..8c050a5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.runtime; + +import java.util.Collection; + +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Runs storage runtime tests + */ +@RunWith(Parameterized.class) +public class StorageExecutionTest { + protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-storage.conf"; + + @BeforeClass + public static void setUp() throws Exception { + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + } + + @Parameters(name = "StorageExecutionTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests("only_testsuite_storage.xml", "testsuite_storage.xml"); + } + + protected TestCaseContext tcCtx; + + public StorageExecutionTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/cc-storage.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/cc-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-storage.conf new file mode 100644 index 0000000..b6bed24 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/cc-storage.conf @@ -0,0 +1,57 @@ +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, +; software distributed under the License is distributed on an +; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +; KIND, either express or implied. See the License for the +; specific language governing permissions and limitations +; under the License. + +[nc/asterix_nc1] +txn.log.dir=target/tmp/asterix_nc1/txnlog +core.dump.dir=target/tmp/asterix_nc1/coredump +iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2 +nc.api.port=19004 + +[nc/asterix_nc2] +ncservice.port=9091 +txn.log.dir=target/tmp/asterix_nc2/txnlog +core.dump.dir=target/tmp/asterix_nc2/coredump +iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2 +nc.api.port=19005 + +[nc] +address=127.0.0.1 +command=asterixnc +app.class=org.apache.asterix.hyracks.bootstrap.NCApplication +jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory" +storage.buffercache.pagesize=32KB +storage.buffercache.size=21MB +storage.memorycomponent.globalbudget=512MB +storage.compression.block=snappy + +[cc] +address = 127.0.0.1 +app.class=org.apache.asterix.hyracks.bootstrap.CCApplication +heartbeat.period=2000 +heartbeat.max.misses=25 + +[common] +log.level = INFO +compiler.framesize=32KB +compiler.sortmemory=320KB +compiler.groupmemory=160KB +compiler.joinmemory=256KB +compiler.textsearchmemory=160KB +messaging.frame.size=4096 +messaging.frame.count=512 +txn.log.checkpoint.pollfrequency=10 +txn.dataset.checkpoint.interval=10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml new file mode 100644 index 0000000..bd34ae0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml @@ -0,0 +1,20 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp" SourceLocation="true"> +</test-suite> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp new file mode 100644 index 0000000..de04c53 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +DROP DATAVERSE test IF EXISTS; +CREATE DATAVERSE test; + +USE test; + +CREATE TYPE KeyType IF NOT EXISTS AS { id: int }; +CREATE DATASET ds(KeyType) PRIMARY KEY id; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp new file mode 100644 index 0000000..e7c84d4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +USE test; +UPSERT INTO ds ({"id": 1}); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp new file mode 100644 index 0000000..821b2f3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +-- polltimeoutsecs=180 +SET `import-private-functions` `true`; +USE test; +SELECT VALUE COUNT(*) +FROM storage_components('test','ds') resource +WHERE array_length(resource.components) > 0; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp new file mode 100644 index 0000000..d195714 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +DROP DATAVERSE test; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 5faf4d8..da464c7 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -38,6 +38,7 @@ "storage.max.active.writable.datasets" : 8, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, + "txn\.dataset\.checkpoint\.interval" : 600, "txn\.job\.recovery\.memorysize" : 67108864, "txn\.lock\.escalationthreshold" : 1000, "txn\.lock\.shrinktimer" : 5000, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index e30c879..fa8f48e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -38,6 +38,7 @@ "storage.max.active.writable.datasets" : 8, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, + "txn\.dataset\.checkpoint\.interval" : 600, "txn\.job\.recovery\.memorysize" : 67108864, "txn\.lock\.escalationthreshold" : 1000, "txn\.lock\.shrinktimer" : 5000, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index ce5add1..801900c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -38,6 +38,7 @@ "storage.max.active.writable.datasets" : 8, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, + "txn\.dataset\.checkpoint\.interval" : 600, "txn\.job\.recovery\.memorysize" : 67108864, "txn\.lock\.escalationthreshold" : 1000, "txn\.lock\.shrinktimer" : 5000, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm new file mode 100644 index 0000000..56a6051 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm @@ -0,0 +1 @@ +1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml new file mode 100644 index 0000000..4f2797f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml @@ -0,0 +1,27 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp" SourceLocation="true"> + <test-group name="storage"> + <test-case FilePath="storage"> + <compilation-unit name="dataset-checkpoint"> + <output-dir compare="Text">dataset-checkpoint</output-dir> + </compilation-unit> + </test-case> + </test-group> +</test-suite> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index d18b6ab..954c209 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.api; import java.util.List; +import java.util.function.Predicate; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; @@ -26,6 +27,7 @@ import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.IResourceLifecycleManager; @@ -56,12 +58,12 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd void flushAllDatasets() throws HyracksDataException; /** - * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN. + * Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate} * - * @param nonSharpCheckpointTargetLSN + * @param indexPredicate * @throws HyracksDataException */ - void scheduleAsyncFlushForLaggingDatasets(long nonSharpCheckpointTargetLSN) throws HyracksDataException; + void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException; /** * creates (if necessary) and returns the dataset info. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java index 6ff51ca..fb6ca6b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java @@ -26,6 +26,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTE import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; @@ -35,6 +36,10 @@ import org.apache.hyracks.util.StorageUtil; public class TransactionProperties extends AbstractProperties { public enum Option implements IOption { + TXN_DATASET_CHECKPOINT_INTERVAL( + POSITIVE_INTEGER, + (int) TimeUnit.MINUTES.toSeconds(10), + "The interval (in seconds) after which a dataset is considered idle and persisted to disk"), TXN_LOG_BUFFER_NUMPAGES(POSITIVE_INTEGER, 8, "The number of pages in the transaction log tail"), TXN_LOG_BUFFER_PAGESIZE( INTEGER_BYTE_UNIT, @@ -173,4 +178,8 @@ public class TransactionProperties extends AbstractProperties { public long getJobRecoveryMemorySize() { return accessor.getLong(Option.TXN_JOB_RECOVERY_MEMORYSIZE); } + + public int getDatasetCheckpointInterval() { + return accessor.getInt(Option.TXN_DATASET_CHECKPOINT_INTERVAL); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 486cd45..61ffadc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.IDatasetMemoryManager; @@ -390,35 +391,27 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException { - //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN + public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) + throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) { - // check all partitions synchronized (opTracker) { - scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN); + asyncFlush(dsr, opTracker, indexPredicate); } } } } - private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr, - PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException { - int partition = opTracker.getPartition(); + private void asyncFlush(DatasetResource dsr, PrimaryIndexOperationTracker opTracker, + Predicate<ILSMIndex> indexPredicate) throws HyracksDataException { + final int partition = opTracker.getPartition(); for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) { LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback(); - if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() - || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) { - long firstLSN = ioCallback.getPersistenceLsn(); - if (firstLSN < targetLSN) { - LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition); - opTracker.setFlushOnExit(true); - if (opTracker.getNumActiveOperations() == 0) { - // No Modify operations currently, we need to trigger the flush and we can do so safely - opTracker.flushIfRequested(); - } - break; - } + if (needsFlush(opTracker, lsmIndex, ioCallback) && indexPredicate.test(lsmIndex)) { + LOGGER.info("Async flushing {}", opTracker); + opTracker.setFlushOnExit(true); + opTracker.flushIfNeeded(); + break; } } } @@ -623,4 +616,10 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC throw new IllegalStateException(e); } } + + private static boolean needsFlush(PrimaryIndexOperationTracker opTracker, ILSMIndex lsmIndex, + LSMIOOperationCallback ioCallback) throws HyracksDataException { + return !(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush() + || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit()); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 59e19ca..806a6d4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -51,9 +51,11 @@ import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.util.annotations.NotThreadSafe; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +@NotThreadSafe public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener { private static final Logger LOGGER = LogManager.getLogger(); private final int partition; @@ -64,6 +66,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker implement private boolean flushOnExit = false; private boolean flushLogCreated = false; private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>(); + private long lastFlushTime = System.nanoTime(); public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { @@ -213,6 +216,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker implement ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE); accessor.getOpContext().setParameters(flushMap); ILSMIOOperation flush = accessor.scheduleFlush(); + lastFlushTime = System.nanoTime(); scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush); flush.addCompleteListener(this); } @@ -276,6 +280,15 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker implement return partition; } + public long getLastFlushTime() { + return lastFlushTime; + } + + @Override + public String toString() { + return "Dataset (" + datasetID + "), Partition (" + partition + ")"; + } + private boolean canSafelyFlush() { return numActiveOperations.get() == 0; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java index 72f987a..a6951b3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java @@ -26,6 +26,7 @@ public class CheckpointProperties { private final int lsnThreshold; private final int pollFrequency; private final int historyToKeep; + private final int datasetCheckpointInterval; public CheckpointProperties(TransactionProperties txnProperties, String nodeId) { // Currently we use the log files directory for checkpoints @@ -33,6 +34,7 @@ public class CheckpointProperties { lsnThreshold = txnProperties.getCheckpointLSNThreshold(); pollFrequency = txnProperties.getCheckpointPollFrequency(); historyToKeep = txnProperties.getCheckpointHistory(); + datasetCheckpointInterval = txnProperties.getDatasetCheckpointInterval(); } public int getLsnThreshold() { @@ -51,10 +53,15 @@ public class CheckpointProperties { return checkpointDirPath; } + public int getDatasetCheckpointInterval() { + return datasetCheckpointInterval; + } + @Override public String toString() { return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"checkpoint-dir-path\" : \"" + checkpointDirPath + "\", \"lsn-threshold\" : " + lsnThreshold + ", \"poll-frequency\" : " + pollFrequency - + ", \"history-to-keep\" : " + historyToKeep + " }"; + + ", \"history-to-keep\" : " + historyToKeep + ", \"dataset-checkpoint-interval\" : " + + datasetCheckpointInterval + "}"; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java index 36cea55..ac652e3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@ -58,4 +58,11 @@ public interface ICheckpointManager extends ILifeCycleComponent { * @param id */ void completed(TxnId id); + + /** + * Checkpoints idle datasets by flushing their in-memory component to disk if needed. + * + * @throws HyracksDataException + */ + void checkpointIdleDatasets() throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index ce523db..ba945be 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -18,19 +18,23 @@ */ package org.apache.asterix.transaction.management.service.recovery; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback; import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** * An implementation of {@link ICheckpointManager} that defines the logic * of checkpoints. @@ -39,10 +43,12 @@ public class CheckpointManager extends AbstractCheckpointManager { private static final Logger LOGGER = LogManager.getLogger(); private static final long NO_SECURED_LSN = -1L; + private final long datasetCheckpointInterval; private final Map<TxnId, Long> securedLSNs; public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { super(txnSubsystem, checkpointProperties); + datasetCheckpointInterval = checkpointProperties.getDatasetCheckpointInterval(); securedLSNs = new HashMap<>(); } @@ -78,9 +84,8 @@ public class CheckpointManager extends AbstractCheckpointManager { boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; if (!checkpointSucceeded) { // Flush datasets with indexes behind target checkpoint LSN - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); - datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); + final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); + dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN)); } capture(minFirstLSN, false); if (checkpointSucceeded) { @@ -100,7 +105,31 @@ public class CheckpointManager extends AbstractCheckpointManager { securedLSNs.remove(id); } + @Override + public synchronized void checkpointIdleDatasets() throws HyracksDataException { + final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); + dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate()); + } + private synchronized long getMinSecuredLSN() { return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values()); } + + private Predicate<ILSMIndex> newIdleDatasetPredicate() { + final long currentTime = System.nanoTime(); + return lsmIndex -> { + if (lsmIndex.isPrimaryIndex()) { + PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker(); + return currentTime - opTracker.getLastFlushTime() >= datasetCheckpointInterval; + } + return false; + }; + } + + private Predicate<ILSMIndex> newLaggingDatasetPredicate(long checkpointTargetLSN) { + return lsmIndex -> { + final LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + return ioCallback.getPersistenceLsn() < checkpointTargetLSN; + }; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ff016514/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java index 1992057..446eec5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java @@ -20,8 +20,6 @@ package org.apache.asterix.transaction.management.service.recovery; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILogManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,29 +57,22 @@ public class CheckpointThread extends Thread { while (shouldRun) { try { sleep(checkpointTermInSecs * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (!shouldRun) { - return; - } - if (lastCheckpointLSN == -1) { - try { + if (!shouldRun) { + return; + } + if (lastCheckpointLSN == -1) { //Since the system just started up after sharp checkpoint, //last checkpoint LSN is considered as the min LSN of the current log partition lastCheckpointLSN = logManager.getReadableSmallestLSN(); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Error getting smallest readable LSN", e); - lastCheckpointLSN = 0; } - } + checkpointManager.checkpointIdleDatasets(); + + //1. get current log LSN + currentLogLSN = logManager.getAppendLSN(); - //1. get current log LSN - currentLogLSN = logManager.getAppendLSN(); + //2. if current log LSN - previous checkpoint > threshold, do checkpoint + if (currentLogLSN - lastCheckpointLSN > lsnThreshold) { - //2. if current log LSN - previous checkpoint > threshold, do checkpoint - if (currentLogLSN - lastCheckpointLSN > lsnThreshold) { - try { // in check point: //1. get minimum first LSN (MFL) from open indexes. //2. if current MinFirstLSN < targetCheckpointLSN, schedule async flush for any open index witch has first LSN < force flush delta @@ -94,9 +85,13 @@ public class CheckpointThread extends Thread { if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) { lastCheckpointLSN = currentCheckpointAttemptMinLSN; } - } catch (HyracksDataException e) { - LOGGER.log(Level.ERROR, "Error during checkpoint", e); + } + } catch (InterruptedException e) { + LOGGER.info("Checkpoint thread interrupted", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error("Error during checkpoint", e); } } }
