http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java new file mode 100644 index 0000000..8abfc3b --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.tephra.distributed.thrift; + + +public enum TTransactionType implements org.apache.thrift.TEnum { + SHORT(1), + LONG(2); + + private final int value; + + private TTransactionType(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + public static TTransactionType findByValue(int value) { + switch (value) { + case 1: + return SHORT; + case 2: + return LONG; + default: + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java new file mode 100644 index 0000000..ed081b5 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TVisibilityLevel.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.tephra.distributed.thrift; + + +public enum TVisibilityLevel implements org.apache.thrift.TEnum { + SNAPSHOT(1), + SNAPSHOT_EXCLUDE_CURRENT(2), + SNAPSHOT_ALL(3); + + private final int value; + + private TVisibilityLevel(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + public static TVisibilityLevel findByValue(int value) { + switch (value) { + case 1: + return SNAPSHOT; + case 2: + return SNAPSHOT_EXCLUDE_CURRENT; + case 3: + return SNAPSHOT_ALL; + default: + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java new file mode 100644 index 0000000..c8bf22a --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.inmemory; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; + +import java.io.InputStream; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the + * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet + * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly + * once" guarantee if no failures happen. + * + * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read + * isolation guarantees. + * + * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees. + */ +public class DetachedTxSystemClient implements TransactionSystemClient { + // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with + // value that is for sure bigger than the last one used before restart. + // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms + // by single client + private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS); + + @Override + public Transaction startShort() { + long wp = getWritePointer(); + // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets + return new Transaction( + Long.MAX_VALUE - 1, wp, new long[0], new long[0], + Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); + } + + private long getWritePointer() { + long wp = generator.incrementAndGet(); + // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one + long now = System.currentTimeMillis(); + if (wp < now * TxConstants.MAX_TX_PER_MS) { + // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx + long advanced = now * TxConstants.MAX_TX_PER_MS; + if (generator.compareAndSet(wp, advanced)) { + wp = advanced; + } + } + return wp; + } + + @Override + public Transaction startShort(int timeout) { + return startShort(); + } + + @Override + public Transaction startLong() { + return startShort(); + } + + @Override + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) { + return true; + } + + @Override + public boolean commit(Transaction tx) { + return true; + } + + @Override + public void abort(Transaction tx) { + // do nothing + } + + @Override + public boolean invalidate(long tx) { + return true; + } + + @Override + public Transaction checkpoint(Transaction tx) { + long newWritePointer = getWritePointer(); + LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers()); + newCheckpointPointers.add(newWritePointer); + return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray()); + } + + @Override + public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { + throw new TransactionCouldNotTakeSnapshotException( + "Snapshot not implemented in detached transaction system client"); + } + + @Override + public String status() { + return TxConstants.STATUS_OK; + } + + @Override + public void resetState() { + // do nothing + } + + @Override + public boolean truncateInvalidTx(Set<Long> invalidTxIds) { + return true; + } + + @Override + public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { + return true; + } + + @Override + public int getInvalidSize() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java new file mode 100644 index 0000000..823f934 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.inmemory; + +import com.google.common.util.concurrent.AbstractService; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.twill.common.Cancellable; +import org.apache.twill.discovery.Discoverable; +import org.apache.twill.discovery.DiscoveryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * Transaction server that manages transaction data for the Reactor. + * <p> + * Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in + * discovery service. + * </p> + */ +public class InMemoryTransactionService extends AbstractService { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class); + + private final DiscoveryService discoveryService; + private final String serviceName; + protected final Provider<TransactionManager> txManagerProvider; + private Cancellable cancelDiscovery; + protected TransactionManager txManager; + + // thrift server config + protected final String address; + protected final int port; + protected final int threads; + protected final int ioThreads; + protected final int maxReadBufferBytes; + + @Inject + public InMemoryTransactionService(Configuration conf, + DiscoveryService discoveryService, + Provider<TransactionManager> txManagerProvider) { + + this.discoveryService = discoveryService; + this.txManagerProvider = txManagerProvider; + this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, + TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME); + + address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); + port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); + + // Retrieve the number of threads for the service + threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, + TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS); + ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, + TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS); + + maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER, + TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER); + + LOG.info("Configuring TransactionService" + + ", address: " + address + + ", port: " + port + + ", threads: " + threads + + ", io threads: " + ioThreads + + ", max read buffer (bytes): " + maxReadBufferBytes); + } + + protected void undoRegister() { + if (cancelDiscovery != null) { + cancelDiscovery.cancel(); + } + } + + protected void doRegister() { + cancelDiscovery = discoveryService.register(new Discoverable() { + @Override + public String getName() { + return serviceName; + } + + @Override + public InetSocketAddress getSocketAddress() { + return getAddress(); + } + }); + } + + protected InetSocketAddress getAddress() { + return new InetSocketAddress(1); + } + + @Override + protected void doStart() { + try { + txManager = txManagerProvider.get(); + txManager.startAndWait(); + doRegister(); + LOG.info("Transaction Thrift service started successfully on " + getAddress()); + notifyStarted(); + } catch (Throwable t) { + LOG.info("Transaction Thrift service didn't start on " + getAddress()); + notifyFailed(t); + } + } + + @Override + protected void doStop() { + undoRegister(); + txManager.stopAndWait(); + notifyStopped(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java new file mode 100644 index 0000000..da38dd2 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.inmemory; + +import com.google.inject.Inject; +import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Set; + +/** + * + */ +public class InMemoryTxSystemClient implements TransactionSystemClient { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class); + + TransactionManager txManager; + + @Inject + public InMemoryTxSystemClient(TransactionManager txmgr) { + txManager = txmgr; + } + + @Override + public Transaction startLong() { + return txManager.startLong(); + } + + @Override + public Transaction startShort() { + return txManager.startShort(); + } + + @Override + public Transaction startShort(int timeout) { + return txManager.startShort(timeout); + } + + @Override + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { + return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); + } + + @Override + public boolean commit(Transaction tx) throws TransactionNotInProgressException { + return txManager.commit(tx); + } + + @Override + public void abort(Transaction tx) { + txManager.abort(tx); + } + + @Override + public boolean invalidate(long tx) { + return txManager.invalidate(tx); + } + + @Override + public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException { + return txManager.checkpoint(tx); + } + + @Override + public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + boolean snapshotTaken = txManager.takeSnapshot(out); + if (!snapshotTaken) { + throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot."); + } + } finally { + out.close(); + } + return new ByteArrayInputStream(out.toByteArray()); + } catch (IOException e) { + LOG.error("Snapshot could not be taken", e); + throw new TransactionCouldNotTakeSnapshotException(e.getMessage()); + } + } + + @Override + public String status() { + return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK; + } + + @Override + public void resetState() { + txManager.resetState(); + } + + @Override + public boolean truncateInvalidTx(Set<Long> invalidTxIds) { + return txManager.truncateInvalidTx(invalidTxIds); + } + + @Override + public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { + return txManager.truncateInvalidTxBefore(time); + } + + @Override + public int getInvalidSize() { + return txManager.getInvalidSize(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java new file mode 100644 index 0000000..2f60225 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.inmemory; + +import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; + +import java.io.InputStream; +import java.util.Collection; +import java.util.Set; + +/** + * Dummy implementation of TxSystemClient. May be useful for perf testing. + */ +public class MinimalTxSystemClient implements TransactionSystemClient { + private long currentTxPointer = 1; + + @Override + public Transaction startShort() { + long wp = currentTxPointer++; + // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets + return new Transaction( + Long.MAX_VALUE - 1, wp, new long[0], new long[0], + Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); + } + + @Override + public Transaction startShort(int timeout) { + return startShort(); + } + + @Override + public Transaction startLong() { + return startShort(); + } + + @Override + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) { + return true; + } + + @Override + public boolean commit(Transaction tx) { + return true; + } + + @Override + public void abort(Transaction tx) { + // do nothing + } + + @Override + public boolean invalidate(long tx) { + return true; + } + + @Override + public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException { + return tx; + } + + @Override + public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { + throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take."); + } + + @Override + public String status() { + return TxConstants.STATUS_OK; + } + + @Override + public void resetState() { + // do nothing + } + + @Override + public boolean truncateInvalidTx(Set<Long> invalidTxIds) { + return true; + } + + @Override + public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { + return true; + } + + @Override + public int getInvalidSize() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java new file mode 100644 index 0000000..2e396fe --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains in memory implementation of the transaction system v2. + */ +package org.apache.tephra.inmemory; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java new file mode 100644 index 0000000..86d2afa --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/metrics/DefaultMetricsCollector.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Slf4jReporter; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>. + * + * <p>The reporting frequency for this collector can be configured by setting the + * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds. + * </p> + */ +public class DefaultMetricsCollector extends TxMetricsCollector { + private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class); + + private final MetricRegistry metrics = new MetricRegistry(); + private JmxReporter jmxReporter; + private ScheduledReporter reporter; + private int reportPeriod; + private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap(); + + @Override + public void configure(Configuration conf) { + // initialize selected output reporter + reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT); + LOG.info("Configured metrics report to emit every {} seconds", reportPeriod); + // TODO: reporters should be pluggable based on injection + jmxReporter = JmxReporter.forRegistry(metrics).build(); + reporter = Slf4jReporter.forRegistry(metrics) + .outputTo(LoggerFactory.getLogger("tephra-metrics")) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + } + + + @Override + public void gauge(String metricName, int value, String... tags) { + AtomicLong gauge = gauges.get(metricName); + if (gauge == null) { + final AtomicLong newValue = new AtomicLong(); + if (gauges.putIfAbsent(metricName, newValue) == null) { + // first to set the value, need to register the metric + metrics.register(metricName, new Gauge<Long>() { + @Override + public Long getValue() { + return newValue.get(); + } + }); + gauge = newValue; + } else { + // someone else set it first + gauge = gauges.get(metricName); + } + } + gauge.set(value); + } + + @Override + public void histogram(String metricName, int value) { + metrics.histogram(metricName).update(value); + } + + @Override + public void rate(String metricName) { + metrics.meter(metricName).mark(); + } + + @Override + public void rate(String metricName, int count) { + metrics.meter(metricName).mark(count); + } + + @Override + protected void startUp() throws Exception { + jmxReporter.start(); + reporter.start(reportPeriod, TimeUnit.SECONDS); + LOG.info("Started metrics reporter"); + } + + @Override + protected void shutDown() throws Exception { + jmxReporter.stop(); + reporter.stop(); + LOG.info("Stopped metrics reporter"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java new file mode 100644 index 0000000..45668e1 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/metrics/MetricsCollector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.metrics; + +import com.google.common.util.concurrent.Service; +import org.apache.hadoop.conf.Configuration; + +/** + * Basic API for Tephra to support system metrics. + */ +public interface MetricsCollector extends Service { + /** + * Report a metric as an absolute value. + */ + void gauge(String metricName, int value, String... tags); + + /** + * Report a metric as a count over a given time duration. This method uses an implicit count of 1. + */ + void rate(String metricName); + + /** + * Report a metric as a count over a given time duration. + */ + void rate(String metricName, int count); + + /** + * Report a metric calculating the distribution of the value. + */ + void histogram(String metricName, int value); + + /** + * Called before the collector service is started, allowing the collector to setup any + * required configuration. + */ + void configure(Configuration conf); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java new file mode 100644 index 0000000..4dc7e2f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/metrics/TxMetricsCollector.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.metrics; + +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.hadoop.conf.Configuration; + +/** + * Metrics Collector Class, to emit Transaction Related Metrics. + * Note: This default implementation is a no-op and doesn't emit any metrics + */ +public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector { + + @Override + public void gauge(String metricName, int value, String... tags) { + //no-op + } + + @Override + public void rate(String metricName) { + // no-op + } + + @Override + public void rate(String metricName, int count) { + // no-op + } + + @Override + public void histogram(String metricName, int value) { + // no-op + } + + @Override + public void configure(Configuration conf) { + // no-op + } + + /* Service methods */ + + @Override + protected void startUp() throws Exception { + // no-op + } + + @Override + protected void shutDown() throws Exception { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/package-info.java b/tephra-core/src/main/java/org/apache/tephra/package-info.java new file mode 100644 index 0000000..dea84ac --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains implementations of the transaction system v2. + */ +package org.apache.tephra; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java new file mode 100644 index 0000000..b1e0978 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tephra.metrics.MetricsCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending + * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}. + */ +public abstract class AbstractTransactionLog implements TransactionLog { + /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */ + private static final long SLOW_APPEND_THRESHOLD = 1000L; + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class); + + private final AtomicLong logSequence = new AtomicLong(); + private final MetricsCollector metricsCollector; + protected long timestamp; + private volatile boolean initialized; + private volatile boolean closed; + private AtomicLong syncedUpTo = new AtomicLong(); + private List<Entry> pendingWrites = Lists.newLinkedList(); + private TransactionLogWriter writer; + + public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + this.timestamp = timestamp; + this.metricsCollector = metricsCollector; + } + + /** + * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they + * also call {@link HDFSTransactionLog#close()}. + * @throws java.io.IOException If an error is encountered initializing the file writer. + */ + public synchronized void init() throws IOException { + if (initialized) { + return; + } + this.writer = createWriter(); + this.initialized = true; + } + + /** + * Returns a log writer to be used for appending any new {@link TransactionEdit} objects. + */ + protected abstract TransactionLogWriter createWriter() throws IOException; + + @Override + public abstract String getName(); + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public void append(TransactionEdit edit) throws IOException { + long startTime = System.nanoTime(); + synchronized (this) { + ensureAvailable(); + + Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); + + // add to pending edits + append(entry); + } + + // wait for sync to complete + sync(); + long durationMillis = (System.nanoTime() - startTime) / 1000000L; + if (durationMillis > SLOW_APPEND_THRESHOLD) { + LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); + } + } + + @Override + public void append(List<TransactionEdit> edits) throws IOException { + long startTime = System.nanoTime(); + synchronized (this) { + ensureAvailable(); + + for (TransactionEdit edit : edits) { + Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); + + // add to pending edits + append(entry); + } + } + + // wait for sync to complete + sync(); + long durationMillis = (System.nanoTime() - startTime) / 1000000L; + if (durationMillis > SLOW_APPEND_THRESHOLD) { + LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); + } + } + + private void ensureAvailable() throws IOException { + if (closed) { + throw new IOException("Log " + getName() + " is already closed, cannot append!"); + } + if (!initialized) { + init(); + } + } + + /* + * Appends new writes to the pendingWrites. It is better to keep it in + * our own queue rather than writing it to the HDFS output stream because + * HDFSOutputStream.writeChunk is not lightweight at all. + */ + private void append(Entry e) throws IOException { + pendingWrites.add(e); + } + + // Returns all currently pending writes. New writes + // will accumulate in a new list. + private List<Entry> getPendingWrites() { + synchronized (this) { + List<Entry> save = this.pendingWrites; + this.pendingWrites = new LinkedList<>(); + return save; + } + } + + private void sync() throws IOException { + // writes out pending entries to the HLog + TransactionLogWriter tmpWriter = null; + long latestSeq = 0; + int entryCount = 0; + synchronized (this) { + if (closed) { + return; + } + // prevent writer being dereferenced + tmpWriter = writer; + + List<Entry> currentPending = getPendingWrites(); + if (!currentPending.isEmpty()) { + tmpWriter.commitMarker(currentPending.size()); + } + + // write out all accumulated entries to log. + for (Entry e : currentPending) { + tmpWriter.append(e); + entryCount++; + latestSeq = Math.max(latestSeq, e.getKey().get()); + } + } + + long lastSynced = syncedUpTo.get(); + // someone else might have already synced our edits, avoid double syncing + if (lastSynced < latestSeq) { + tmpWriter.sync(); + metricsCollector.histogram("wal.sync.size", entryCount); + syncedUpTo.compareAndSet(lastSynced, latestSeq); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + // perform a final sync if any outstanding writes + if (!pendingWrites.isEmpty()) { + sync(); + } + // NOTE: writer is lazy-inited, so it can be null + if (writer != null) { + this.writer.close(); + } + this.closed = true; + } + + public boolean isClosed() { + return closed; + } + + @Override + public abstract TransactionLogReader getReader() throws IOException; + + /** + * Represents an entry in the transaction log. Each entry consists of a key, generated from an incrementing sequence + * number, and a value, the {@link TransactionEdit} being stored. + */ + public static class Entry implements Writable { + private LongWritable key; + private TransactionEdit edit; + + // for Writable + public Entry() { + this.key = new LongWritable(); + this.edit = new TransactionEdit(); + } + + public Entry(LongWritable key, TransactionEdit edit) { + this.key = key; + this.edit = edit; + } + + public LongWritable getKey() { + return this.key; + } + + public TransactionEdit getEdit() { + return this.edit; + } + + @Override + public void write(DataOutput out) throws IOException { + this.key.write(out); + this.edit.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.key.readFields(in); + this.edit.readFields(in); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java new file mode 100644 index 0000000..1c51ccd --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionStateStorage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.tephra.snapshot.SnapshotCodecProvider; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot + * with a version when encoding, and to select the correct codec for decoding based on this version prefix. + */ +public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { + + protected final SnapshotCodecProvider codecProvider; + + protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) { + this.codecProvider = codecProvider; + } + + @Override + public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { + codecProvider.encode(out, snapshot); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java new file mode 100644 index 0000000..c4f02e5 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/CommitMarkerCodec.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Ints; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.SequenceFile; +import org.apache.tephra.TxConstants; + +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above. + */ +public class CommitMarkerCodec implements Closeable { + private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8); + private final DataOutputBuffer rawKey; + private final DataOutputBuffer rawValue; + private SequenceFile.ValueBytes valueBytes; + + public CommitMarkerCodec() { + this.rawKey = new DataOutputBuffer(); + this.rawValue = new DataOutputBuffer(); + } + + @Override + public void close() throws IOException { + rawKey.close(); + rawValue.close(); + } + + // 1. Returns the count when the marker is written correctly + // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException + // since this indicates corrupted log file + // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method + // since we can recover without any consequence + public int readMarker(SequenceFile.Reader reader) throws IOException { + if (valueBytes == null) { + valueBytes = reader.createValueBytes(); + } + rawKey.reset(); + rawValue.reset(); + + // valueBytes need not be reset since nextRaw call does it (and it is a private method) + int status = reader.nextRaw(rawKey, valueBytes); + + // if we reach EOF, return -1 + if (status == -1) { + return -1; + } + + // Check if the marker key is valid and return the count + if (isMarkerValid()) { + valueBytes.writeUncompressedBytes(rawValue); + rawValue.flush(); + // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes + return Ints.fromByteArray(rawValue.getData()); + } + + // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress + throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s", + new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED)); + } + + private boolean isMarkerValid() { + // rawKey should have the expected length and the matching bytes should start at index 0 + return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0; + } + + public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException { + writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count)); + } + + @VisibleForTesting + static final class CommitEntriesCount implements SequenceFile.ValueBytes { + private final int numEntries; + + public CommitEntriesCount(int numEntries) { + this.numEntries = numEntries; + } + + @Override + public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { + outStream.write(Ints.toByteArray(numEntries)); + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { + throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed."); + } + + @Override + public int getSize() { + return Ints.BYTES; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java new file mode 100644 index 0000000..ba781ac --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.MetricsCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; + +/** + * Allows reading from and writing to a transaction write-ahead log stored in HDFS. + */ +public class HDFSTransactionLog extends AbstractTransactionLog { + private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class); + + private final FileSystem fs; + private final Configuration hConf; + private final Path logPath; + + /** + * Creates a new HDFS-backed write-ahead log for storing transaction state. + * @param fs Open FileSystem instance for opening log files in HDFS. + * @param hConf HDFS cluster configuration. + * @param logPath Path to the log file. + */ + public HDFSTransactionLog(final FileSystem fs, final Configuration hConf, + final Path logPath, long timestamp, MetricsCollector metricsCollector) { + super(timestamp, metricsCollector); + this.fs = fs; + this.hConf = hConf; + this.logPath = logPath; + } + + @Override + protected TransactionLogWriter createWriter() throws IOException { + return new LogWriter(fs, hConf, logPath); + } + + @Override + public String getName() { + return logPath.getName(); + } + + @Override + public TransactionLogReader getReader() throws IOException { + FileStatus status = fs.getFileStatus(logPath); + long length = status.getLen(); + + TransactionLogReader reader = null; + // check if this file needs to be recovered due to failure + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + if (length <= 0) { + LOG.warn("File " + logPath + " might be still open, length is 0"); + } + + try { + HDFSUtil hdfsUtil = new HDFSUtil(); + hdfsUtil.recoverFileLease(fs, logPath, hConf); + try { + FileStatus newStatus = fs.getFileStatus(logPath); + LOG.info("New file size for " + logPath + " is " + newStatus.getLen()); + SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf); + reader = new HDFSTransactionLogReaderSupplier(fileReader).get(); + } catch (EOFException e) { + if (length <= 0) { + // TODO should we ignore an empty, not-last log file if skip.errors + // is false? Either way, the caller should decide what to do. E.g. + // ignore if this is the last log in sequence. + // TODO is this scenario still possible if the log has been + // recovered (i.e. closed) + LOG.warn("Could not open " + logPath + " for reading. File is empty", e); + return null; + } else { + // EOFException being ignored + return null; + } + } + } catch (IOException e) { + throw e; + } + return reader; + } + + @VisibleForTesting + static final class LogWriter implements TransactionLogWriter { + private final SequenceFile.Writer internalWriter; + public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException { + // TODO: retry a few times to ride over transient failures? + SequenceFile.Metadata metadata = new SequenceFile.Metadata(); + metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), + new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION))); + + this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class, + SequenceFile.CompressionType.NONE, null, null, metadata); + LOG.debug("Created a new TransactionLog writer for " + logPath); + } + + @Override + public void append(Entry entry) throws IOException { + internalWriter.append(entry.getKey(), entry.getEdit()); + } + + @Override + public void commitMarker(int count) throws IOException { + CommitMarkerCodec.writeMarker(internalWriter, count); + } + + @Override + public void sync() throws IOException { + internalWriter.syncFs(); + } + + @Override + public void close() throws IOException { + internalWriter.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java new file mode 100644 index 0000000..a517903 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Supplier; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.tephra.TxConstants; + +/** + * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata, + * to read HDFS Transaction Logs. + */ +public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> { + private final SequenceFile.Reader reader; + private final byte version; + private TransactionLogReader logReader; + + public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) { + this.reader = reader; + Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY)); + this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString()); + } + + @Override + public TransactionLogReader get() { + if (logReader != null) { + return logReader; + } + + switch (version) { + case 2: + logReader = new HDFSTransactionLogReaderV2(reader); + return logReader; + case 1: + logReader = new HDFSTransactionLogReaderV1(reader); + return logReader; + default: + throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java new file mode 100644 index 0000000..faefaec --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; + +/** + * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to + * have a sequence of {@link TransactionEdit}s. + */ +public class HDFSTransactionLogReaderV1 implements TransactionLogReader { + private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class); + private final SequenceFile.Reader reader; + private final LongWritable key; + private boolean closed; + + public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) { + this.reader = reader; + this.key = new LongWritable(); + } + + @Override + public TransactionEdit next() throws IOException { + return next(new TransactionEdit()); + } + + @Override + public TransactionEdit next(TransactionEdit reuse) throws IOException { + if (closed) { + return null; + } + + try { + boolean successful = reader.next(key, reuse); + return successful ? reuse : null; + } catch (EOFException e) { + LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e); + return null; + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + reader.close(); + closed = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java new file mode 100644 index 0000000..ce50da8 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to + * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker), + * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of + * {@link TransactionEdit}s are discarded. + */ +public class HDFSTransactionLogReaderV2 implements TransactionLogReader { + private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class); + + private final SequenceFile.Reader reader; + private final Queue<TransactionEdit> transactionEdits; + private final CommitMarkerCodec commitMarkerCodec; + private final LongWritable key; + + private boolean closed; + + public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) { + this.reader = reader; + this.transactionEdits = new ArrayDeque<>(); + this.key = new LongWritable(); + this.commitMarkerCodec = new CommitMarkerCodec(); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + try { + commitMarkerCodec.close(); + } finally { + reader.close(); + closed = true; + } + } + + @Override + public TransactionEdit next() throws IOException { + return next(null); + } + + @Override + public TransactionEdit next(TransactionEdit reuse) throws IOException { + if (closed) { + return null; + } + + if (!transactionEdits.isEmpty()) { + return transactionEdits.remove(); + } + + // Fetch the 'marker' and read 'marker' number of edits + populateTransactionEdits(); + return transactionEdits.poll(); + } + + private void populateTransactionEdits() throws IOException { + // read the marker to determine numEntries to read. + int numEntries = 0; + try { + // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely + // ignore this + numEntries = commitMarkerCodec.readMarker(reader); + } catch (EOFException e) { + LOG.warn("Reached EOF in log while trying to read commit marker", e); + } + + for (int i = 0; i < numEntries; i++) { + TransactionEdit edit = new TransactionEdit(); + try { + if (reader.next(key, edit)) { + transactionEdits.add(edit); + } else { + throw new EOFException("Attempt to read TransactionEdit failed."); + } + } catch (EOFException e) { + // we have reached EOF before reading back numEntries, we clear the partial list and return. + LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker", + numEntries, transactionEdits.size(), e); + transactionEdits.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java new file mode 100644 index 0000000..d751ae2 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionStateStorage.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.CountingInputStream; +import com.google.common.primitives.Longs; +import com.google.inject.Inject; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.ConfigurationFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Handles persistence of transaction snapshot and logs to a directory in HDFS. + * + * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property. + * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering. Snapshot files + * are written with the filename "snapshot.<timestamp>". Transaction log files are written with the filename + * "txlog.<timestamp>". + */ +public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage { + private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class); + + private static final String SNAPSHOT_FILE_PREFIX = "snapshot."; + private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot."; + private static final String LOG_FILE_PREFIX = "txlog."; + + private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(SNAPSHOT_FILE_PREFIX); + } + }; + + // buffer size used for HDFS reads and writes + private static final int BUFFER_SIZE = 16384; + + private final Configuration hConf; + private final String configuredSnapshotDir; + private final MetricsCollector metricsCollector; + private FileSystem fs; + private Path snapshotDir; + + @Inject + public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider, + MetricsCollector metricsCollector) { + super(codecProvider); + this.hConf = hConf; + this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); + this.metricsCollector = metricsCollector; + } + + @Override + protected void startUp() throws Exception { + Preconditions.checkState(configuredSnapshotDir != null, + "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR + + " in configuration."); + String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER); + if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) { + if (hdfsUser != null && LOG.isDebugEnabled()) { + LOG.debug("Ignoring configuration {}={}, running on secure Hadoop", + TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser); + } + // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead + // of getting closed one + fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf); + } else { + fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser); + } + snapshotDir = new Path(configuredSnapshotDir); + LOG.info("Using snapshot dir " + snapshotDir); + } + + @Override + protected void shutDown() throws Exception { + fs.close(); + } + + @Override + public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { + // create a temporary file, and save the snapshot + Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); + LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile); + + FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE); + // encode the snapshot and stream the serialized version to the file + try { + codecProvider.encode(out, snapshot); + } finally { + out.close(); + } + + // move the temporary file into place with the correct filename + Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); + fs.rename(snapshotTmpFile, finalFile); + LOG.debug("Completed snapshot to file {}", finalFile); + } + + @Override + public TransactionSnapshot getLatestSnapshot() throws IOException { + InputStream in = getLatestSnapshotInputStream(); + if (in == null) { + return null; + } + try { + return readSnapshotInputStream(in); + } finally { + in.close(); + } + } + + @Override + public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { + InputStream in = getLatestSnapshotInputStream(); + if (in == null) { + return null; + } + try { + return readTransactionVisibilityStateFromInputStream(in); + } finally { + in.close(); + } + } + + private InputStream getLatestSnapshotInputStream() throws IOException { + TimestampedFilename[] snapshots = listSnapshotFiles(); + Arrays.sort(snapshots); + if (snapshots.length > 0) { + // last is the most recent + return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE); + } + + LOG.info("No snapshot files found in {}", snapshotDir); + return null; + } + + private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException { + CountingInputStream countingIn = new CountingInputStream(in); + TransactionSnapshot snapshot = codecProvider.decode(countingIn); + LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount()); + return snapshot; + } + + private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException { + CountingInputStream countingIn = new CountingInputStream(in); + TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn); + LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount()); + return state; + } + + private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException { + FSDataInputStream in = fs.open(filePath, BUFFER_SIZE); + try { + return readSnapshotInputStream(in); + } finally { + in.close(); + } + } + + private TimestampedFilename[] listSnapshotFiles() throws IOException { + FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER); + TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length]; + for (int i = 0; i < snapshotFileStatuses.length; i++) { + snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath()); + } + return snapshotFiles; + } + + @Override + public long deleteOldSnapshots(int numberToKeep) throws IOException { + TimestampedFilename[] snapshots = listSnapshotFiles(); + if (snapshots.length == 0) { + return -1; + } + Arrays.sort(snapshots, Collections.reverseOrder()); + if (snapshots.length <= numberToKeep) { + // nothing to remove, oldest timestamp is the last snapshot + return snapshots[snapshots.length - 1].getTimestamp(); + } + int toRemoveCount = snapshots.length - numberToKeep; + TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount]; + System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount); + + for (TimestampedFilename f : toRemove) { + LOG.debug("Removing old snapshot file {}", f.getPath()); + fs.delete(f.getPath(), false); + } + long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp(); + LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp); + return oldestTimestamp; + } + + @Override + public List<String> listSnapshots() throws IOException { + FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER); + return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() { + @Nullable + @Override + public String apply(@Nullable FileStatus input) { + return input.getPath().getName(); + } + }); + } + + @Override + public List<TransactionLog> getLogsSince(long timestamp) throws IOException { + FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE)); + TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length]; + for (int i = 0; i < statuses.length; i++) { + timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath()); + } + return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() { + @Nullable + @Override + public TransactionLog apply(@Nullable TimestampedFilename input) { + return openLog(input.getPath(), input.getTimestamp()); + } + }); + } + + @Override + public TransactionLog createLog(long timestamp) throws IOException { + Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp); + return openLog(newLog, timestamp); + } + + private TransactionLog openLog(Path path, long timestamp) { + return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector); + } + + @Override + public void deleteLogsOlderThan(long timestamp) throws IOException { + FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp)); + int removedCnt = 0; + for (FileStatus status : statuses) { + LOG.debug("Removing old transaction log {}", status.getPath()); + if (fs.delete(status.getPath(), false)) { + removedCnt++; + } else { + LOG.error("Failed to delete transaction log file {}", status.getPath()); + } + } + LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp); + } + + @Override + public void setupStorage() throws IOException { + if (!fs.exists(snapshotDir)) { + LOG.info("Creating snapshot dir at {}", snapshotDir); + fs.mkdirs(snapshotDir); + } else { + Preconditions.checkState(fs.isDirectory(snapshotDir), + "Configured snapshot directory " + snapshotDir + " is not a directory!"); + } + } + + @Override + public List<String> listLogs() throws IOException { + FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE)); + return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() { + @Nullable + @Override + public String apply(@Nullable FileStatus input) { + return input.getPath().getName(); + } + }); + } + + @Override + public String getLocation() { + return snapshotDir.toString(); + } + + private static class LogFileFilter implements PathFilter { + // starting time of files to include (inclusive) + private final long startTime; + // ending time of files to include (exclusive) + private final long endTime; + + public LogFileFilter(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public boolean accept(Path path) { + if (path.getName().startsWith(LOG_FILE_PREFIX)) { + String[] parts = path.getName().split("\\."); + if (parts.length == 2) { + try { + long fileTime = Long.parseLong(parts[1]); + return fileTime >= startTime && fileTime < endTime; + } catch (NumberFormatException ignored) { + LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName()); + } + } + } + return false; + } + } + + /** + * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both + * snapshot and transaction log filenames. + */ + private static class TimestampedFilename implements Comparable<TimestampedFilename> { + private Path path; + private String prefix; + private long timestamp; + + public TimestampedFilename(Path path) { + this.path = path; + String[] parts = path.getName().split("\\."); + if (parts.length != 2) { + throw new IllegalArgumentException("Filename " + path.getName() + + " did not match the expected pattern prefix.timestamp"); + } + prefix = parts[0]; + timestamp = Long.parseLong(parts[1]); + } + + public Path getPath() { + return path; + } + + public String getPrefix() { + return prefix; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public int compareTo(TimestampedFilename other) { + int res = prefix.compareTo(other.getPrefix()); + if (res == 0) { + res = Longs.compare(timestamp, other.getTimestamp()); + } + return res; + } + } + + // TODO move this out as a separate command line tool + private enum CLIMode { SNAPSHOT, TXLOG }; + /** + * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout. + * + * Supports the following options: + * -s read snapshot state (defaults to the latest) + * -l read a transaction log + * [filename] reads the given file + * @param args + */ + public static void main(String[] args) { + List<String> filenames = Lists.newArrayList(); + CLIMode mode = null; + for (String arg : args) { + if ("-s".equals(arg)) { + mode = CLIMode.SNAPSHOT; + } else if ("-l".equals(arg)) { + mode = CLIMode.TXLOG; + } else if ("-h".equals(arg)) { + printUsage(null); + } else { + filenames.add(arg); + } + } + + if (mode == null) { + printUsage("ERROR: Either -s or -l is required to set mode.", 1); + } + + Configuration config = new ConfigurationFactory().get(); + + // Use the no-op metrics collector. We are being run as a command line tool, so there are no relevant metrics + // to report + HDFSTransactionStateStorage storage = + new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()); + storage.startAndWait(); + try { + switch (mode) { + case SNAPSHOT: + try { + if (filenames.isEmpty()) { + TransactionSnapshot snapshot = storage.getLatestSnapshot(); + printSnapshot(snapshot); + } + for (String file : filenames) { + Path path = new Path(file); + TransactionSnapshot snapshot = storage.readSnapshotFile(path); + printSnapshot(snapshot); + System.out.println(); + } + } catch (IOException ioe) { + System.err.println("Error reading snapshot files: " + ioe.getMessage()); + ioe.printStackTrace(); + System.exit(1); + } + break; + case TXLOG: + if (filenames.isEmpty()) { + printUsage("ERROR: At least one transaction log filename is required!", 1); + } + for (String file : filenames) { + TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file)); + TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp()); + printLog(log); + System.out.println(); + } + break; + } + } finally { + storage.stop(); + } + } + + private static void printUsage(String message) { + printUsage(message, 0); + } + + private static void printUsage(String message, int exitCode) { + if (message != null) { + System.out.println(message); + } + System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]"); + System.out.println(); + System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)"); + System.out.println("\t-l\tRead files as transaction logs [filename is required]"); + System.out.println("\t-h\tPrint this message"); + System.exit(exitCode); + } + + private static void printSnapshot(TransactionSnapshot snapshot) { + Date snapshotDate = new Date(snapshot.getTimestamp()); + System.out.println("TransactionSnapshot at " + snapshotDate.toString()); + System.out.println("\t" + snapshot.toString()); + } + + private static void printLog(TransactionLog log) { + try { + System.out.println("TransactionLog " + log.getName()); + TransactionLogReader reader = log.getReader(); + TransactionEdit edit; + long seq = 0; + while ((edit = reader.next()) != null) { + System.out.println(String.format(" %d: %s", seq++, edit.toString())); + } + } catch (IOException ioe) { + System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage()); + ioe.printStackTrace(); + } + } +}
