Revert "ACCUMULO-3423 optimize WAL metadata table updates"
This reverts commit 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87.
Conflicts:
core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
test/src/test/java/org/apache/accumulo/test/VolumeIT.java
test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/36ca2575
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/36ca2575
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/36ca2575
Branch: refs/heads/master
Commit: 36ca257547edd92ae7d39ec06f64a3cf527e038e
Parents: 59913ea
Author: Josh Elser <[email protected]>
Authored: Sat May 9 14:48:33 2015 -0400
Committer: Josh Elser <[email protected]>
Committed: Sat May 9 14:48:33 2015 -0400
----------------------------------------------------------------------
.../client/impl/ReplicationOperationsImpl.java | 4 +-
.../org/apache/accumulo/core/conf/Property.java | 4 +-
.../accumulo/core/metadata/RootTable.java | 1 -
.../core/metadata/schema/MetadataSchema.java | 48 --
.../core/tabletserver/log/LogEntry.java | 78 +--
.../core/metadata/MetadataTableSchemaTest.java | 47 --
.../org/apache/accumulo/server/TabletLevel.java | 32 --
.../apache/accumulo/server/fs/VolumeUtil.java | 22 +-
.../apache/accumulo/server/init/Initialize.java | 1 -
.../server/master/state/MetaDataStateStore.java | 47 +-
.../master/state/MetaDataTableScanner.java | 6 +-
.../master/state/TabletLocationState.java | 7 -
.../server/master/state/TabletStateStore.java | 17 +-
.../master/state/ZooTabletStateStore.java | 36 +-
.../accumulo/server/replication/StatusUtil.java | 13 -
.../accumulo/server/util/ListVolumesUsed.java | 18 +-
.../server/util/MasterMetadataUtil.java | 16 +-
.../accumulo/server/util/MetadataTableUtil.java | 239 +++-----
.../server/util/ReplicationTableUtil.java | 13 +-
.../server/util/ReplicationTableUtilTest.java | 2 +-
.../gc/GarbageCollectWriteAheadLogs.java | 496 +++++++++-------
.../accumulo/gc/SimpleGarbageCollector.java | 1 +
.../CloseWriteAheadLogReferences.java | 23 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 567 +++++++++++++++++++
.../CloseWriteAheadLogReferencesTest.java | 151 ++++-
.../java/org/apache/accumulo/master/Master.java | 3 -
.../master/MasterClientServiceHandler.java | 3 +-
.../accumulo/master/TabletGroupWatcher.java | 30 +-
.../accumulo/master/replication/WorkMaker.java | 1 -
.../accumulo/master/state/MergeStats.java | 3 +-
.../master/ReplicationOperationsImplTest.java | 9 +-
.../apache/accumulo/master/TestMergeState.java | 2 +-
.../master/state/RootTabletStateStoreTest.java | 4 +-
.../src/main/findbugs/exclude-filter.xml | 2 +-
.../apache/accumulo/tserver/TabletServer.java | 183 +++---
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +-
.../accumulo/tserver/log/SortedLogRecovery.java | 8 +-
.../tserver/log/TabletServerLogger.java | 193 +++----
.../accumulo/tserver/tablet/CommitSession.java | 3 +-
.../tserver/tablet/DatafileManager.java | 4 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 59 +-
.../tserver/tablet/TabletCommitter.java | 3 +-
.../accumulo/tserver/log/LogEntryTest.java | 56 --
.../test/performance/thrift/NullTserver.java | 6 +-
.../accumulo/proxy/ProxyDurabilityIT.java | 9 +-
.../test/BadDeleteMarkersCreatedIT.java | 2 +-
.../org/apache/accumulo/test/BalanceIT.java | 20 +-
.../org/apache/accumulo/test/CleanWalIT.java | 1 -
.../accumulo/test/ConditionalWriterIT.java | 1 -
.../accumulo/test/GarbageCollectWALIT.java | 81 ---
.../MissingWalHeaderCompletesRecoveryIT.java | 14 +-
.../accumulo/test/NoMutationRecoveryIT.java | 178 ++++++
.../org/apache/accumulo/test/ShellServerIT.java | 2 +-
.../org/apache/accumulo/test/UnusedWALIT.java | 144 -----
.../java/org/apache/accumulo/test/VolumeIT.java | 17 -
.../accumulo/test/functional/ReadWriteIT.java | 8 -
.../accumulo/test/functional/WALSunnyDayIT.java | 250 --------
.../test/functional/WatchTheWatchCountIT.java | 2 +-
.../test/performance/RollWALPerformanceIT.java | 120 ----
...bageCollectorCommunicatesWithTServersIT.java | 35 +-
.../replication/MultiInstanceReplicationIT.java | 2 +-
.../test/replication/ReplicationIT.java | 370 ++++++++----
62 files changed, 1886 insertions(+), 1845 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 925877d..6a5c74a 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -153,7 +153,9 @@ public class ReplicationOperationsImpl implements
ReplicationOperations {
try {
for (Entry<Key,Value> entry : metaBs) {
LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(),
entry.getValue());
- wals.add(new Path(logEntry.filename).toString());
+ for (String log : logEntry.logSet) {
+ wals.add(new Path(log).toString());
+ }
}
} finally {
metaBs.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 09d462d..b0ade7a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -342,8 +342,8 @@ public enum Property {
+ "no longer in use are removed from the filesystem."),
GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port
for the garbage collector's monitor service"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number
of threads used to delete files"),
- GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not
use the Trash, even if it is configured."),
- GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive
any files/directories instead of moving to the HDFS trash or deleting."),
+ GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not
use the Trash, even if it is configured"),
+ GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive
any files/directories instead of moving to the HDFS trash or deleting"),
GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent
of gc cycles to trace"),
// properties that are specific to the monitor server behavior
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index 97d73d1..292ba3b 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -41,7 +41,6 @@ public class RootTable {
public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET +
"/future_location";
public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET +
"/lastlocation";
public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
- public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET +
"/current_logs";
public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null,
null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index fe75f9e..6baae17 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,14 +16,11 @@
*/
package org.apache.accumulo.core.metadata.schema;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
@@ -281,49 +278,4 @@ public class MetadataSchema {
buff.set(buff.getBytes(), section.getRowPrefix().length(),
buff.getLength() - section.getRowPrefix().length());
}
}
-
- /**
- * Holds references to the WALs in use in a live Tablet Server.
- * <p>
- * <code>~wal+tserver:port[sessionId]
log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL [] -></code>
- */
- public static class CurrentLogsSection {
- private static final Section section = new Section(RESERVED_PREFIX +
"wal+", true, RESERVED_PREFIX + "wal,", false);
- private static byte LEFT_BRACKET = (byte) '[';
- public static final Text COLF = new Text("log");
- public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
-
- public static Range getRange() {
- return section.getRange();
- }
-
- public static String getRowPrefix() {
- return section.getRowPrefix();
- }
-
- public static void getTabletServer(Key k, Text hostPort, Text session) {
- Preconditions.checkNotNull(k);
- Preconditions.checkNotNull(hostPort);
- Preconditions.checkNotNull(session);
-
- Text row = new Text();
- k.getRow(row);
- if (!row.toString().startsWith(section.getRowPrefix())) {
- throw new IllegalArgumentException("Bad key " + k.toString());
- }
- for (int sessionStart = section.getRowPrefix().length(); sessionStart <
row.getLength() - 1; sessionStart++) {
- if (row.charAt(sessionStart) == LEFT_BRACKET) {
- hostPort.set(row.getBytes(), section.getRowPrefix().length(),
sessionStart - section.getRowPrefix().length());
- session.set(row.getBytes(), sessionStart + 1, row.getLength() -
sessionStart - 2);
- return;
- }
- }
- throw new IllegalArgumentException("Bad key " + k.toString());
- }
-
- public static void getPath(Key k, Text path) {
- k.getColumnQualifier(path);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index ab70bb0..7fe61d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -16,10 +16,10 @@
*/
package org.apache.accumulo.core.tabletserver.log;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -29,29 +29,30 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Joiner;
+
public class LogEntry {
- public final KeyExtent extent;
- public final long timestamp;
- public final String server;
- public final String filename;
+ public KeyExtent extent;
+ public long timestamp;
+ public String server;
+ public String filename;
+ public int tabletId;
+ public Collection<String> logSet;
+
+ public LogEntry() {}
public LogEntry(LogEntry le) {
this.extent = le.extent;
this.timestamp = le.timestamp;
this.server = le.server;
this.filename = le.filename;
- }
-
- public LogEntry(KeyExtent extent, long timestamp, String server, String
filename) {
- this.extent = extent;
- this.timestamp = timestamp;
- this.server = server;
- this.filename = filename;
+ this.tabletId = le.tabletId;
+ this.logSet = new ArrayList<String>(le.logSet);
}
@Override
public String toString() {
- return extent.toString() + " " + filename;
+ return extent.toString() + " " + filename + " (" + tabletId + ")";
}
public String getName() {
@@ -64,35 +65,43 @@ public class LogEntry {
out.writeLong(timestamp);
out.writeUTF(server);
out.writeUTF(filename);
+ out.write(tabletId);
+ out.write(logSet.size());
+ for (String s : logSet) {
+ out.writeUTF(s);
+ }
return Arrays.copyOf(out.getData(), out.getLength());
}
- static public LogEntry fromBytes(byte bytes[]) throws IOException {
+ public void fromBytes(byte bytes[]) throws IOException {
DataInputBuffer inp = new DataInputBuffer();
inp.reset(bytes, bytes.length);
- KeyExtent extent = new KeyExtent();
+ extent = new KeyExtent();
extent.readFields(inp);
- long timestamp = inp.readLong();
- String server = inp.readUTF();
- String filename = inp.readUTF();
- return new LogEntry(extent, timestamp, server, filename);
+ timestamp = inp.readLong();
+ server = inp.readUTF();
+ filename = inp.readUTF();
+ tabletId = inp.read();
+ int count = inp.read();
+ ArrayList<String> logSet = new ArrayList<String>(count);
+ for (int i = 0; i < count; i++)
+ logSet.add(inp.readUTF());
+ this.logSet = logSet;
}
static private final Text EMPTY_TEXT = new Text();
public static LogEntry fromKeyValue(Key key, Value value) {
- String qualifier = key.getColumnQualifier().toString();
- if (qualifier.indexOf('/') < 1) {
- throw new IllegalArgumentException("Bad key for log entry: " + key);
- }
- KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+ LogEntry result = new LogEntry();
+ result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
String[] parts = key.getColumnQualifier().toString().split("/", 2);
- String server = parts[0];
- // handle old-style log entries that specify log sets
- parts = value.toString().split("\\|")[0].split(";");
- String filename = parts[parts.length - 1];
- long timestamp = key.getTimestamp();
- return new LogEntry(extent, timestamp, server, filename);
+ result.server = parts[0];
+ result.filename = parts[1];
+ parts = value.toString().split("\\|");
+ result.tabletId = Integer.parseInt(parts[1]);
+ result.logSet = Arrays.asList(parts[0].split(";"));
+ result.timestamp = key.getTimestamp();
+ return result;
}
public Text getRow() {
@@ -103,16 +112,11 @@ public class LogEntry {
return MetadataSchema.TabletsSection.LogColumnFamily.NAME;
}
- public String getUniqueID() {
- String parts[] = filename.split("/");
- return parts[parts.length - 1];
- }
-
public Text getColumnQualifier() {
return new Text(server + "/" + filename);
}
public Value getValue() {
- return new Value(filename.getBytes(UTF_8));
+ return new Value((Joiner.on(";").join(logSet) + "|" +
tabletId).getBytes());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git
a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
deleted file mode 100644
index cfe59f2..0000000
---
a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.metadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.accumulo.core.data.Key;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class MetadataTableSchemaTest {
-
- @Test
- public void testGetTabletServer() throws Exception {
- Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log",
"hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
- Text hostPort = new Text();
- Text session = new Text();
- CurrentLogsSection.getTabletServer(key, hostPort, session);
- assertEquals("host:43861", hostPort.toString());
- assertEquals("14a7df0e6420003", session.toString());
- try {
- Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log",
"hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
- CurrentLogsSection.getTabletServer(bogus, hostPort, session);
- fail("bad argument not thrown");
- } catch (IllegalArgumentException ex) {
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
deleted file mode 100644
index 50886ef..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public enum TabletLevel {
- ROOT, META, NORMAL;
-
- public static TabletLevel getLevel(KeyExtent extent) {
- if (!extent.isMeta())
- return NORMAL;
- if (extent.isRootTablet())
- return ROOT;
- return META;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 4722e60..c3595cd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -128,12 +128,15 @@ public class VolumeUtil {
switchedPath = le.filename;
ArrayList<String> switchedLogs = new ArrayList<String>();
- String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
- if (switchedLog != null) {
- switchedLogs.add(switchedLog);
- numSwitched++;
- } else {
- switchedLogs.add(le.filename);
+ for (String log : le.logSet) {
+ String switchedLog = switchVolume(le.filename, FileType.WAL,
replacements);
+ if (switchedLog != null) {
+ switchedLogs.add(switchedLog);
+ numSwitched++;
+ } else {
+ switchedLogs.add(log);
+ }
+
}
if (numSwitched == 0) {
@@ -141,7 +144,9 @@ public class VolumeUtil {
return null;
}
- LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server,
switchedPath);
+ LogEntry newLogEntry = new LogEntry(le);
+ newLogEntry.filename = switchedPath;
+ newLogEntry.logSet = switchedLogs;
log.trace("Switched " + le + " to " + newLogEntry);
@@ -239,7 +244,7 @@ public class VolumeUtil {
log.debug("Tablet directory switched, need to record old log files " +
logsToRemove + " " + ProtobufUtil.toString(status));
// Before deleting these logs, we need to mark them for replication
for (LogEntry logEntry : logsToRemove) {
- ReplicationTableUtil.updateFiles(context, extent, logEntry.filename,
status);
+ ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet,
status);
}
}
}
@@ -248,6 +253,7 @@ public class VolumeUtil {
// method this should return the exact strings that are in the metadata
table
return ret;
+
}
private static String decommisionedTabletDir(AccumuloServerContext context,
ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9afb93f..c6f1dd8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -533,7 +533,6 @@ public class Initialize implements KeywordExecutable {
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS,
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET,
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS,
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
- zoo.putPersistentData(zkInstanceRoot +
RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH,
rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS,
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK,
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 600349b..7ee6f0c 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -17,9 +17,6 @@
package org.apache.accumulo.server.master.state;
import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
@@ -30,14 +27,9 @@ import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
public class MetaDataStateStore extends TabletStateStore {
- private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
private static final int THREADS = 4;
private static final int LATENCY = 1000;
@@ -67,7 +59,7 @@ public class MetaDataStateStore extends TabletStateStore {
@Override
public ClosableIterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(context,
MetadataSchema.TabletsSection.getRange(), state, targetTableName);
+ return new MetaDataTableScanner(context,
MetadataSchema.TabletsSection.getRange(), state);
}
@Override
@@ -124,7 +116,7 @@ public class MetaDataStateStore extends TabletStateStore {
}
@Override
- public void unassign(Collection<TabletLocationState> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers) throws
DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets) throws
DistributedStoreException {
BatchWriter writer = createBatchWriter();
try {
@@ -132,15 +124,6 @@ public class MetaDataStateStore extends TabletStateStore {
Mutation m = new Mutation(tls.extent.getMetadataEntry());
if (tls.current != null) {
tls.current.clearLocation(m);
- if (logsForDeadServers != null) {
- List<Path> logs = logsForDeadServers.get(tls.current);
- if (logs != null) {
- for (Path log : logs) {
- LogEntry entry = new LogEntry(tls.extent, 0,
tls.current.hostPort(), log.toString());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(),
entry.getValue());
- }
- }
- }
}
if (tls.future != null) {
tls.future.clearFutureLocation(m);
@@ -162,30 +145,4 @@ public class MetaDataStateStore extends TabletStateStore {
public String name() {
return "Normal Tablets";
}
-
- @Override
- public void markLogsAsUnused(AccumuloServerContext context,
Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
- for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
- if (entry.getValue().isEmpty()) {
- continue;
- }
- Mutation m = new
Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() +
entry.getKey().toString());
- for (Path log : entry.getValue()) {
- m.put(MetadataSchema.CurrentLogsSection.COLF, new
Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
- }
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- log.error("Error marking logs as unused: " + logs);
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index bec2dc4..d64c108 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -141,7 +141,6 @@ public class MetaDataTableScanner implements
ClosableIterator<TabletLocationStat
boolean chopped = false;
for (Entry<Key,Value> entry : decodedRow.entrySet()) {
-
Key key = entry.getKey();
Text row = key.getRow();
Text cf = key.getColumnFamily();
@@ -174,9 +173,8 @@ public class MetaDataTableScanner implements
ClosableIterator<TabletLocationStat
}
}
if (extent == null) {
- String msg = "No prev-row for key extent " + decodedRow;
- log.error(msg);
- throw new BadLocationStateException(msg, k.getRow());
+ log.warn("No prev-row for key extent: " + decodedRow);
+ return null;
}
return new TabletLocationState(extent, future, current, last, walogs,
chopped);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index 8116ecf..fb30440 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -68,13 +68,6 @@ public class TabletLocationState {
final public Collection<Collection<String>> walogs;
final public boolean chopped;
- public TServerInstance futureOrCurrent() {
- if (current != null) {
- return current;
- }
- return future;
- }
-
@Override
public String toString() {
return extent + "@(" + future + "," + current + "," + last + ")" +
(chopped ? " chopped" : "");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 147e071..5413e31 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -18,11 +18,8 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.hadoop.fs.Path;
/**
* Interface for storing information about tablet assignments. There are three
implementations:
@@ -59,13 +56,10 @@ public abstract class TabletStateStore implements
Iterable<TabletLocationState>
*
* @param tablets
* the tablets' current information
- * @param logsForDeadServers
- * a cache of logs in use by servers when they died
*/
- abstract public void unassign(Collection<TabletLocationState> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers) throws
DistributedStoreException;
+ abstract public void unassign(Collection<TabletLocationState> tablets)
throws DistributedStoreException;
- public static void unassign(AccumuloServerContext context,
TabletLocationState tls, Map<TServerInstance,List<Path>> logsForDeadServers)
- throws DistributedStoreException {
+ public static void unassign(AccumuloServerContext context,
TabletLocationState tls) throws DistributedStoreException {
TabletStateStore store;
if (tls.extent.isRootTablet()) {
store = new ZooTabletStateStore();
@@ -74,7 +68,7 @@ public abstract class TabletStateStore implements
Iterable<TabletLocationState>
} else {
store = new MetaDataStateStore(context);
}
- store.unassign(Collections.singletonList(tls), logsForDeadServers);
+ store.unassign(Collections.singletonList(tls));
}
public static void setLocation(AccumuloServerContext context, Assignment
assignment) throws DistributedStoreException {
@@ -89,9 +83,4 @@ public abstract class TabletStateStore implements
Iterable<TabletLocationState>
store.setLocations(Collections.singletonList(assignment));
}
- /**
- * When a server fails, its logs must be marked as unused after the log
markers are moved to the tablets.
- */
- abstract public void markLogsAsUnused(AccumuloServerContext context,
Map<TServerInstance,List<Path>> logs) throws DistributedStoreException;
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 03627e3..ab99396 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -21,17 +21,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,9 +85,10 @@ public class ZooTabletStateStore extends TabletStateStore {
for (String entry :
store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" +
entry);
if (logInfo != null) {
- LogEntry logEntry = LogEntry.fromBytes(logInfo);
- logs.add(Collections.singleton(logEntry.filename));
- log.debug("root tablet log " + logEntry.filename);
+ LogEntry logEntry = new LogEntry();
+ logEntry.fromBytes(logInfo);
+ logs.add(logEntry.logSet);
+ log.debug("root tablet logSet " + logEntry.logSet);
}
}
TabletLocationState result = new
TabletLocationState(RootTable.EXTENT, futureSession, currentSession,
lastSession, logs, false);
@@ -165,29 +161,12 @@ public class ZooTabletStateStore extends TabletStateStore
{
}
@Override
- public void unassign(Collection<TabletLocationState> tablets,
Map<TServerInstance,List<Path>> logsForDeadServers) throws
DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets) throws
DistributedStoreException {
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
if (tls.extent.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet
location");
- if (logsForDeadServers != null) {
- List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
- if (logs != null) {
- for (Path entry : logs) {
- LogEntry logEntry = new LogEntry(RootTable.EXTENT,
System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(),
entry.toString());
- byte[] value;
- try {
- value = logEntry.toBytes();
- } catch (IOException ex) {
- throw new DistributedStoreException(ex);
- }
- store.put(RootTable.ZROOT_TABLET_WALOGS + "/" +
logEntry.getUniqueID(), value);
- store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" +
MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString()
- + logEntry.getUniqueID());
- }
- }
- }
store.remove(RootTable.ZROOT_TABLET_LOCATION);
store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
log.debug("unassign root tablet location");
@@ -198,9 +177,4 @@ public class ZooTabletStateStore extends TabletStateStore {
return "Root Table";
}
- @Override
- public void markLogsAsUnused(AccumuloServerContext context,
Map<TServerInstance,List<Path>> logs) {
- // the root table is not replicated, so unassigning the root tablet has
removed the current log marker
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index ad892b8..e6e3cfd 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -153,19 +153,6 @@ public class StatusUtil {
/**
* @return A {@link Status} for an open file of unspecified length, all of
which needs replicating.
*/
- public static Status openWithUnknownLength(long timeCreated) {
- Builder builder = Status.newBuilder();
- builder.setBegin(0);
- builder.setEnd(0);
- builder.setInfiniteEnd(true);
- builder.setClosed(false);
- builder.setCreatedTime(timeCreated);
- return builder.build();
- }
-
- /**
- * @return A {@link Status} for an open file of unspecified length, all of
which needs replicating.
- */
public static Status openWithUnknownLength() {
return INF_END_REPLICATION_STATUS;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 9e3fc7d..e90d1dd 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
/**
*
@@ -62,6 +61,9 @@ public class ListVolumesUsed {
private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
volumes.add(getLogURI(logEntry.filename));
+ for (String logSet : logEntry.logSet) {
+ volumes.add(getLogURI(logSet));
+ }
}
private static void listZookeeper() throws Exception {
@@ -121,20 +123,6 @@ public class ListVolumesUsed {
for (String volume : volumes)
System.out.println("\tVolume : " + volume);
-
- volumes.clear();
- scanner.clearColumns();
- scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
- Text path = new Text();
- for (Entry<Key,Value> entry : scanner) {
- MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
- volumes.add(getLogURI(path.toString()));
- }
-
- System.out.println("Listing volumes referenced in " + name + " current
logs section");
-
- for (String volume : volumes)
- System.out.println("\tVolume : " + volume);
}
public static void listVolumes(ClientContext context) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 29745d5..14eba68 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -248,27 +248,35 @@ public class MasterMetadataUtil {
if (unusedWalLogs != null) {
updateRootTabletDataFile(extent, path, mergeFile, dfv, time,
filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
}
+
return;
}
+
Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv,
time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation,
flushId);
+
MetadataTableUtil.update(context, zooLock, m, extent);
+
}
/**
* Update the data file for the root tablet
*/
- private static void updateRootTabletDataFile(KeyExtent extent, FileRef path,
FileRef mergeFile, DataFileValue dfv, String time,
+ protected static void updateRootTabletDataFile(KeyExtent extent, FileRef
path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock,
Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
+ // unusedWalLogs will contain the location/name of each log in a log set
+ // the log set is stored under one of the log names, but not both
+ // find the entry under one of the names and delete it.
String root = MetadataTableUtil.getZookeeperLogLocation();
+ boolean foundEntry = false;
for (String entry : unusedWalLogs) {
String[] parts = entry.split("/");
String zpath = root + "/" + parts[parts.length - 1];
while (true) {
try {
if (zk.exists(zpath)) {
- log.debug("Removing WAL reference for root table " + zpath);
zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
+ foundEntry = true;
}
break;
} catch (KeeperException e) {
@@ -279,6 +287,8 @@ public class MasterMetadataUtil {
UtilWaitThread.sleep(1000);
}
}
+ if (unusedWalLogs.size() > 0 && !foundEntry)
+ log.warn("WALog entry for root tablet did not exist " + unusedWalLogs);
}
/**
@@ -286,7 +296,7 @@ public class MasterMetadataUtil {
*
* @return A Mutation to update a tablet from the given information
*/
- private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef
path, FileRef mergeFile, DataFileValue dfv, String time,
+ protected static Mutation getUpdateForTabletDataFile(KeyExtent extent,
FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock,
Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
Mutation m = new Mutation(extent.getMetadataEntry());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index d517989..5e74aac 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,6 +23,8 @@ import static
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -59,7 +61,6 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -81,12 +82,10 @@ import
org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -122,7 +121,7 @@ public class MetadataTableUtil {
return metadataTable;
}
- public synchronized static Writer getRootTable(ClientContext context) {
+ private synchronized static Writer getRootTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer rootTable = root_tables.get(credentials);
if (rootTable == null) {
@@ -224,7 +223,7 @@ public class MetadataTableUtil {
// add before removing in case of process death
for (LogEntry logEntry : logsToAdd)
- addRootLogEntry(context, zooLock, logEntry);
+ addLogEntry(context, logEntry, zooLock);
removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
} else {
@@ -249,35 +248,6 @@ public class MetadataTableUtil {
}
}
- private static interface ZooOperation {
- void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException;
- }
-
- private static void retryZooKeeperUpdate(ClientContext context, ZooLock
zooLock, ZooOperation op) {
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- op.run(zoo);
- }
- break;
- } catch (Exception e) {
- log.error("Unexpected exception {}", e.getMessage(), e);
- }
- UtilWaitThread.sleep(1000);
- }
- }
-
- private static void addRootLogEntry(AccumuloServerContext context, ZooLock
zooLock, final LogEntry entry) {
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException {
- String root = getZookeeperLogLocation();
- rw.putPersistentData(root + "/" + entry.getUniqueID(),
entry.toBytes(), NodeExistsPolicy.OVERWRITE);
- }
- });
- }
-
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent
extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new
TreeMap<FileRef,DataFileValue>();
@@ -477,6 +447,34 @@ public class MetadataTableUtil {
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
RootTable.ZROOT_TABLET_WALOGS;
}
+ public static void addLogEntry(ClientContext context, LogEntry entry,
ZooLock zooLock) {
+ if (entry.extent.isRootTablet()) {
+ String root = getZookeeperLogLocation();
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ String[] parts = entry.filename.split("/");
+ String uniqueId = parts[parts.length - 1];
+ zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(),
NodeExistsPolicy.OVERWRITE);
+ }
+ break;
+ } catch (KeeperException e) {
+ log.error("{}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ log.error("{}", e.getMessage(), e);
+ } catch (IOException e) {
+ log.error("{}", e.getMessage(), e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ } else {
+ Mutation m = new Mutation(entry.getRow());
+ m.put(entry.getColumnFamily(), entry.getColumnQualifier(),
entry.getValue());
+ update(context, zooLock, m, entry.extent);
+ }
+ }
+
public static void setRootTabletDir(String dir) throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
RootTable.ZROOT_TABLET_PATH;
@@ -567,11 +565,22 @@ public class MetadataTableUtil {
}
}
+ Collections.sort(result, new Comparator<LogEntry>() {
+ @Override
+ public int compare(LogEntry o1, LogEntry o2) {
+ long diff = o1.timestamp - o2.timestamp;
+ if (diff < 0)
+ return -1;
+ if (diff > 0)
+ return 1;
+ return 0;
+ }
+ });
log.info("Returning logs " + result + " for extent " + extent);
return result;
}
- static void getRootLogEntries(final ArrayList<LogEntry> result) throws
KeeperException, InterruptedException, IOException {
+ static void getRootLogEntries(ArrayList<LogEntry> result) throws
KeeperException, InterruptedException, IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String root = getZookeeperLogLocation();
// there's a little race between getting the children and fetching
@@ -579,10 +588,11 @@ public class MetadataTableUtil {
while (true) {
result.clear();
for (String child : zoo.getChildren(root)) {
+ LogEntry e = new LogEntry();
try {
- LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child,
null));
+ e.fromBytes(zoo.getData(root + "/" + child, null));
// upgrade from !0;!0<< -> +r<<
- e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
+ e.extent = RootTable.EXTENT;
result.add(e);
} catch (KeeperException.NoNodeException ex) {
continue;
@@ -652,23 +662,28 @@ public class MetadataTableUtil {
return new LogEntryIterator(context);
}
- public static void removeUnusedWALEntries(AccumuloServerContext context,
KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
+ public static void removeUnusedWALEntries(AccumuloServerContext context,
KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
if (extent.isRootTablet()) {
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException {
- String root = getZookeeperLogLocation();
- for (LogEntry entry : entries) {
- String path = root + "/" + entry.getUniqueID();
- log.debug("Removing " + path + " from zookeeper");
- rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ for (LogEntry entry : logEntries) {
+ String root = getZookeeperLogLocation();
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ String parts[] = entry.filename.split("/");
+ zoo.recursiveDelete(root + "/" + parts[parts.length - 1],
NodeMissingPolicy.SKIP);
+ }
+ break;
+ } catch (Exception e) {
+ log.error("{}", e.getMessage(), e);
}
+ UtilWaitThread.sleep(1000);
}
- });
+ }
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (LogEntry entry : entries) {
- m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
+ for (LogEntry entry : logEntries) {
+ m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
}
update(context, zooLock, m, extent);
}
@@ -1053,130 +1068,4 @@ public class MetadataTableUtil {
return tabletEntries;
}
- public static void addNewLogMarker(ClientContext context, ZooLock zooLock,
final TServerInstance tabletSession, final Path filename, TabletLevel level) {
- log.debug("Adding log entry " + filename);
- if (level == TabletLevel.ROOT) {
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String uniqueId = filename.getName();
- StringBuilder path = new StringBuilder(root);
- path.append("/");
- path.append(CurrentLogsSection.getRowPrefix());
- path.append(tabletSession.toString());
- path.append(uniqueId);
- rw.putPersistentData(path.toString(),
filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
- }
- });
- } else {
- Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() +
tabletSession.toString());
- m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new
Value(EMPTY_BYTES));
- String tableName = MetadataTable.NAME;
- if (level == TabletLevel.META) {
- tableName = RootTable.NAME;
- }
- BatchWriter bw = null;
- try {
- bw = context.getConnector().createBatchWriter(tableName, null);
- bw.addMutation(m);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (bw != null) {
- try {
- bw.close();
- } catch (Exception e2) {
- throw new RuntimeException(e2);
- }
- }
- }
- }
- }
-
- private static void removeCurrentRootLogMarker(ClientContext context,
ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String uniqueId = filename.getName();
- String path = root + "/" + CurrentLogsSection.getRowPrefix() +
tabletSession.toString() + uniqueId;
- log.debug("Removing entry " + path + " from zookeeper");
- rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
- }
- });
- }
-
- public static void markLogUnused(ClientContext context, ZooLock lock,
TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
- // There could be a marker at the meta and/or root level, mark them both
as unused
- try {
- BatchWriter root = null;
- BatchWriter meta = null;
- try {
- root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- meta = context.getConnector().createBatchWriter(MetadataTable.NAME,
null);
- for (Path fname : all) {
- Text tname = new Text(fname.toString());
- Mutation m = new
Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() +
tabletSession.toString());
- m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
- root.addMutation(m);
- log.debug("deleting " +
MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + "
log:" + fname);
- m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() +
tabletSession.toString());
- m.put(MetadataSchema.CurrentLogsSection.COLF, tname,
MetadataSchema.CurrentLogsSection.UNUSED);
- meta.addMutation(m);
- removeCurrentRootLogMarker(context, lock, tabletSession, fname);
- }
- } finally {
- if (root != null) {
- root.close();
- }
- if (meta != null) {
- meta.close();
- }
- }
- } catch (Exception ex) {
- throw new AccumuloException(ex);
- }
- }
-
- public static void fetchLogsForDeadServer(ClientContext context, ZooLock
lock, KeyExtent extent, TServerInstance server,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws
TableNotFoundException, AccumuloException, AccumuloSecurityException {
- // already cached
- if (logsForDeadServers.containsKey(server)) {
- return;
- }
- if (extent.isRootTablet()) {
- final List<Path> logs = new ArrayList<>();
- retryZooKeeperUpdate(context, lock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException,
InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
RootTable.ZROOT_TABLET_CURRENT_LOGS;
- logs.clear();
- for (String child : rw.getChildren(root)) {
- logs.add(new Path(new String(rw.getData(root + "/" + child, null),
UTF_8)));
- }
- }
- });
- logsForDeadServers.put(server, logs);
- } else {
- // use the correct meta table
- String table = MetadataTable.NAME;
- if (extent.isMeta()) {
- table = RootTable.NAME;
- }
- // fetch the current logs in use, and put them in the cache
- Scanner scanner = context.getConnector().createScanner(table,
Authorizations.EMPTY);
- scanner.setRange(new
Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
- List<Path> logs = new ArrayList<>();
- Text path = new Text();
- for (Entry<Key,Value> entry : scanner) {
- MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
- if
(!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
- logs.add(new Path(path.toString()));
- }
- }
- logsForDeadServers.put(server, logs);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index c6d5ce4..8e755a3 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.util;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -175,14 +176,20 @@ public class ReplicationTableUtil {
/**
* Write replication ingest entries for each provided file with the given
{@link Status}.
*/
- public static void updateFiles(ClientContext context, KeyExtent extent,
String file, Status stat) {
+ public static void updateFiles(ClientContext context, KeyExtent extent,
Collection<String> files, Status stat) {
if (log.isDebugEnabled()) {
- log.debug("Updating replication status for " + extent + " with " + file
+ " using " + ProtobufUtil.toString(stat));
+ log.debug("Updating replication status for " + extent + " with " + files
+ " using " + ProtobufUtil.toString(stat));
}
// TODO could use batch writer, would need to handle failure and retry
like update does - ACCUMULO-1294
+ if (files.isEmpty()) {
+ return;
+ }
Value v = ProtobufUtil.toValue(stat);
- update(context, createUpdateMutation(new Path(file), v, extent), extent);
+ for (String file : files) {
+ // TODO Can preclude this addition if the extent is for a table we don't
need to replicate
+ update(context, createUpdateMutation(new Path(file), v, extent), extent);
+ }
}
static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 04a83d3..3983bde 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -91,7 +91,7 @@ public class ReplicationTableUtilTest {
String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
long createdTime = System.currentTimeMillis();
- ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"),
null, null), myFile, StatusUtil.fileCreated(createdTime));
+ ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"),
null, null), Collections.singleton(myFile),
StatusUtil.fileCreated(createdTime));
verify(writer);