Updated Branches: refs/heads/trunk 2c65393d4 -> fcd0aebb6
Fix CommitLogReplayer date time issue patch by Vijay ; reviewed by jbellis for CASSANDRA-5909 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/742e5baf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/742e5baf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/742e5baf Branch: refs/heads/trunk Commit: 742e5baf6311a1141b42bec0b3c3dc2ff19fa376 Parents: b281dd1 Author: Vijay Parthasarathy <[email protected]> Authored: Sat Sep 14 16:05:54 2013 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Sat Sep 14 16:05:54 2013 -0700 ---------------------------------------------------------------------- conf/commitlog_archiving.properties | 7 +++-- .../db/commitlog/CommitLogArchiver.java | 12 +++++++- .../db/commitlog/CommitLogReplayer.java | 2 +- test/conf/commitlog_archiving.properties | 19 +++++++++++++ .../cassandra/db/RecoveryManagerTest.java | 29 ++++++++++++++++++++ 5 files changed, 65 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/conf/commitlog_archiving.properties ---------------------------------------------------------------------- diff --git a/conf/commitlog_archiving.properties b/conf/commitlog_archiving.properties index 23adaeb..b19cfed 100644 --- a/conf/commitlog_archiving.properties +++ b/conf/commitlog_archiving.properties @@ -43,8 +43,8 @@ restore_command= # Directory to scan the recovery files in. restore_directories= -# Restore mutations created up to and including this timestamp. -# Format: 2012-04-31 20:43:12 +# Restore mutations created up to and including this timestamp in GMT. +# Format: yyyy:MM:dd HH:mm:ss (2012:04:31 20:43:12) # # Note! Recovery will stop when the first client-supplied timestamp # greater than this time is encountered. Since the order Cassandra @@ -52,3 +52,6 @@ restore_directories= # this may leave some mutations with timestamps earlier than the # point-in-time unrecovered. restore_point_in_time= + +# precision of the timestamp used in the inserts (MILLISECONDS, MICROSECONDS, ...) +precision=MILLISECONDS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index e9d850a..78026a4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -27,6 +27,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; import java.util.concurrent.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; @@ -43,12 +44,19 @@ import com.google.common.base.Strings; public class CommitLogArchiver { private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class); + public static final SimpleDateFormat format = new SimpleDateFormat("yyyy:MM:dd HH:mm:ss"); + static + { + format.setTimeZone(TimeZone.getTimeZone("GMT")); + } + public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>(); public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("commitlog_archiver"); private final String archiveCommand; private final String restoreCommand; private final String restoreDirectories; public final long restorePointInTime; + public final TimeUnit precision; public CommitLogArchiver() { @@ -65,6 +73,7 @@ public class CommitLogArchiver restoreCommand = null; restoreDirectories = null; restorePointInTime = Long.MAX_VALUE; + precision = TimeUnit.MILLISECONDS; } else { @@ -73,9 +82,10 @@ public class CommitLogArchiver restoreCommand = commitlog_commands.getProperty("restore_command"); restoreDirectories = commitlog_commands.getProperty("restore_directories"); String targetTime = commitlog_commands.getProperty("restore_point_in_time"); + precision = TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MILLISECONDS")); try { - restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : new SimpleDateFormat("yyyy:MM:dd HH:mm:ss").parse(targetTime).getTime(); + restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : format.parse(targetTime).getTime(); } catch (ParseException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 6b401fb..796ab5b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -281,7 +281,7 @@ public class CommitLogReplayer for (ColumnFamily families : frm.getColumnFamilies()) { - if (families.maxTimestamp() > restoreTarget) + if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget) return true; } return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/test/conf/commitlog_archiving.properties ---------------------------------------------------------------------- diff --git a/test/conf/commitlog_archiving.properties b/test/conf/commitlog_archiving.properties new file mode 100644 index 0000000..aaf6bd1 --- /dev/null +++ b/test/conf/commitlog_archiving.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +restore_point_in_time=2112:12:12 12:12:12 +precision=MICROSECONDS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/742e5baf/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java index 68c0b37..9c17d80 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java @@ -19,13 +19,17 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.util.Date; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.Util; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogArchiver; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.column; @@ -101,4 +105,29 @@ public class RecoveryManagerTest extends SchemaLoader assert c != null; assert ((CounterColumn)c).total() == 10L; } + + @Test + public void testRecoverPIT() throws Exception + { + Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12"); + long timeMS = date.getTime() - 5000; + + Table keyspace1 = Table.open("Keyspace1"); + DecoratedKey dk = Util.dk("dkey"); + for (int i = 0; i < 10; ++i) + { + long ts = TimeUnit.MILLISECONDS.toMicros(timeMS + (i * 1000)); + ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1"); + cf.addColumn(column("name-" + i, "value", ts)); + RowMutation rm = new RowMutation("Keyspace1", dk.key); + rm.add(cf); + rm.apply(); + } + keyspace1.getColumnFamilyStore("Standard1").clearUnsafe(); + CommitLog.instance.resetUnsafe(); // disassociate segments from live CL + CommitLog.instance.recover(); + + ColumnFamily cf = Util.getColumnFamily(keyspace1, dk, "Standard1"); + Assert.assertEquals(6, cf.getColumnCount()); + } }
