http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java deleted file mode 100644 index 0d10eb3..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TTransactionType.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Autogenerated by Thrift Compiler (0.9.0) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package co.cask.tephra.distributed.thrift; - - -import java.util.Map; -import java.util.HashMap; -import org.apache.thrift.TEnum; - -public enum TTransactionType implements org.apache.thrift.TEnum { - SHORT(1), - LONG(2); - - private final int value; - - private TTransactionType(int value) { - this.value = value; - } - - /** - * Get the integer value of this enum value, as defined in the Thrift IDL. - */ - public int getValue() { - return value; - } - - /** - * Find a the enum type by its integer value, as defined in the Thrift IDL. - * @return null if the value is not found. - */ - public static TTransactionType findByValue(int value) { - switch (value) { - case 1: - return SHORT; - case 2: - return LONG; - default: - return null; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java b/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java deleted file mode 100644 index 6de6f87..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/thrift/TVisibilityLevel.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Autogenerated by Thrift Compiler (0.9.0) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package co.cask.tephra.distributed.thrift; - - -public enum TVisibilityLevel implements org.apache.thrift.TEnum { - SNAPSHOT(1), - SNAPSHOT_EXCLUDE_CURRENT(2), - SNAPSHOT_ALL(3); - - private final int value; - - private TVisibilityLevel(int value) { - this.value = value; - } - - /** - * Get the integer value of this enum value, as defined in the Thrift IDL. - */ - public int getValue() { - return value; - } - - /** - * Find a the enum type by its integer value, as defined in the Thrift IDL. - * @return null if the value is not found. - */ - public static TVisibilityLevel findByValue(int value) { - switch (value) { - case 1: - return SNAPSHOT; - case 2: - return SNAPSHOT_EXCLUDE_CURRENT; - case 3: - return SNAPSHOT_ALL; - default: - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java deleted file mode 100644 index 1731b0c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/inmemory/DetachedTxSystemClient.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.inmemory; - -import co.cask.tephra.InvalidTruncateTimeException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCouldNotTakeSnapshotException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; -import it.unimi.dsi.fastutil.longs.LongArrayList; - -import java.io.InputStream; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Implementation of the tx system client that doesn't talk to any global service and tries to do its best to meet the - * tx system requirements/expectations. In fact it implements enough logic to support running flows (when each flowlet - * uses its own detached tx system client, without talking to each other and sharing any state) with "process exactly - * once" guarantee if no failures happen. - * - * NOTE: Will NOT detect conflicts. May leave inconsistent state when process crashes. Does NOT provide even read - * isolation guarantees. - * - * Good for performance testing. For demoing high throughput. For use-cases with relaxed tx guarantees. - */ -public class DetachedTxSystemClient implements TransactionSystemClient { - // Dataset and queue logic relies on tx id to grow monotonically even after restart. Hence we need to start with - // value that is for sure bigger than the last one used before restart. - // NOTE: with code below we assume we don't do more than InMemoryTransactionManager.MAX_TX_PER_MS tx/ms - // by single client - private AtomicLong generator = new AtomicLong(System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS); - - @Override - public Transaction startShort() { - long wp = getWritePointer(); - // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets - return new Transaction( - Long.MAX_VALUE - 1, wp, new long[0], new long[0], - Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); - } - - private long getWritePointer() { - long wp = generator.incrementAndGet(); - // NOTE: using InMemoryTransactionManager.MAX_TX_PER_MS to be at least close to real one - long now = System.currentTimeMillis(); - if (wp < now * TxConstants.MAX_TX_PER_MS) { - // trying to advance to align with timestamp, but only once: if failed, we'll just try again later with next tx - long advanced = now * TxConstants.MAX_TX_PER_MS; - if (generator.compareAndSet(wp, advanced)) { - wp = advanced; - } - } - return wp; - } - - @Override - public Transaction startShort(int timeout) { - return startShort(); - } - - @Override - public Transaction startLong() { - return startShort(); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) { - return true; - } - - @Override - public boolean commit(Transaction tx) { - return true; - } - - @Override - public void abort(Transaction tx) { - // do nothing - } - - @Override - public boolean invalidate(long tx) { - return true; - } - - @Override - public Transaction checkpoint(Transaction tx) { - long newWritePointer = getWritePointer(); - LongArrayList newCheckpointPointers = new LongArrayList(tx.getCheckpointWritePointers()); - newCheckpointPointers.add(newWritePointer); - return new Transaction(tx, newWritePointer, newCheckpointPointers.toLongArray()); - } - - @Override - public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { - throw new TransactionCouldNotTakeSnapshotException( - "Snapshot not implemented in detached transaction system client"); - } - - @Override - public String status() { - return TxConstants.STATUS_OK; - } - - @Override - public void resetState() { - // do nothing - } - - @Override - public boolean truncateInvalidTx(Set<Long> invalidTxIds) { - return true; - } - - @Override - public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { - return true; - } - - @Override - public int getInvalidSize() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java deleted file mode 100644 index feecdce..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTransactionService.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.inmemory; - -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TxConstants; -import com.google.common.util.concurrent.AbstractService; -import com.google.inject.Inject; -import com.google.inject.Provider; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.common.Cancellable; -import org.apache.twill.discovery.Discoverable; -import org.apache.twill.discovery.DiscoveryService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; - -/** - * Transaction server that manages transaction data for the Reactor. - * <p> - * Transaction server is HA, one can start multiple instances, only one of which is active and will register itself in - * discovery service. - * </p> - */ -public class InMemoryTransactionService extends AbstractService { - private static final Logger LOG = LoggerFactory.getLogger(InMemoryTransactionService.class); - - private final DiscoveryService discoveryService; - private final String serviceName; - protected final Provider<TransactionManager> txManagerProvider; - private Cancellable cancelDiscovery; - protected TransactionManager txManager; - - // thrift server config - protected final String address; - protected final int port; - protected final int threads; - protected final int ioThreads; - protected final int maxReadBufferBytes; - - @Inject - public InMemoryTransactionService(Configuration conf, - DiscoveryService discoveryService, - Provider<TransactionManager> txManagerProvider) { - - this.discoveryService = discoveryService; - this.txManagerProvider = txManagerProvider; - this.serviceName = conf.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, - TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME); - - address = conf.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); - port = conf.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); - - // Retrieve the number of threads for the service - threads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, - TxConstants.Service.DEFAULT_DATA_TX_SERVER_THREADS); - ioThreads = conf.getInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, - TxConstants.Service.DEFAULT_DATA_TX_SERVER_IO_THREADS); - - maxReadBufferBytes = conf.getInt(TxConstants.Service.CFG_DATA_TX_THRIFT_MAX_READ_BUFFER, - TxConstants.Service.DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER); - - LOG.info("Configuring TransactionService" + - ", address: " + address + - ", port: " + port + - ", threads: " + threads + - ", io threads: " + ioThreads + - ", max read buffer (bytes): " + maxReadBufferBytes); - } - - protected void undoRegister() { - if (cancelDiscovery != null) { - cancelDiscovery.cancel(); - } - } - - protected void doRegister() { - cancelDiscovery = discoveryService.register(new Discoverable() { - @Override - public String getName() { - return serviceName; - } - - @Override - public InetSocketAddress getSocketAddress() { - return getAddress(); - } - }); - } - - protected InetSocketAddress getAddress() { - return new InetSocketAddress(1); - } - - @Override - protected void doStart() { - try { - txManager = txManagerProvider.get(); - txManager.startAndWait(); - doRegister(); - LOG.info("Transaction Thrift service started successfully on " + getAddress()); - notifyStarted(); - } catch (Throwable t) { - LOG.info("Transaction Thrift service didn't start on " + getAddress()); - notifyFailed(t); - } - } - - @Override - protected void doStop() { - undoRegister(); - txManager.stopAndWait(); - notifyStopped(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java deleted file mode 100644 index ba15269..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/inmemory/InMemoryTxSystemClient.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.inmemory; - -import co.cask.tephra.InvalidTruncateTimeException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCouldNotTakeSnapshotException; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionNotInProgressException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import com.google.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.Set; - -/** - * - */ -public class InMemoryTxSystemClient implements TransactionSystemClient { - - private static final Logger LOG = LoggerFactory.getLogger(InMemoryTxSystemClient.class); - - TransactionManager txManager; - - @Inject - public InMemoryTxSystemClient(TransactionManager txmgr) { - txManager = txmgr; - } - - @Override - public Transaction startLong() { - return txManager.startLong(); - } - - @Override - public Transaction startShort() { - return txManager.startShort(); - } - - @Override - public Transaction startShort(int timeout) { - return txManager.startShort(timeout); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - return changeIds.isEmpty() || txManager.canCommit(tx, changeIds); - } - - @Override - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - return txManager.commit(tx); - } - - @Override - public void abort(Transaction tx) { - txManager.abort(tx); - } - - @Override - public boolean invalidate(long tx) { - return txManager.invalidate(tx); - } - - @Override - public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException { - return txManager.checkpoint(tx); - } - - @Override - public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try { - boolean snapshotTaken = txManager.takeSnapshot(out); - if (!snapshotTaken) { - throw new TransactionCouldNotTakeSnapshotException("Transaction manager did not take a snapshot."); - } - } finally { - out.close(); - } - return new ByteArrayInputStream(out.toByteArray()); - } catch (IOException e) { - LOG.error("Snapshot could not be taken", e); - throw new TransactionCouldNotTakeSnapshotException(e.getMessage()); - } - } - - @Override - public String status() { - return txManager.isRunning() ? TxConstants.STATUS_OK : TxConstants.STATUS_NOTOK; - } - - @Override - public void resetState() { - txManager.resetState(); - } - - @Override - public boolean truncateInvalidTx(Set<Long> invalidTxIds) { - return txManager.truncateInvalidTx(invalidTxIds); - } - - @Override - public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { - return txManager.truncateInvalidTxBefore(time); - } - - @Override - public int getInvalidSize() { - return txManager.getInvalidSize(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java deleted file mode 100644 index 6f83565..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/inmemory/MinimalTxSystemClient.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.inmemory; - -import co.cask.tephra.InvalidTruncateTimeException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCouldNotTakeSnapshotException; -import co.cask.tephra.TransactionNotInProgressException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; - -import java.io.InputStream; -import java.util.Collection; -import java.util.Set; - -/** - * Dummy implementation of TxSystemClient. May be useful for perf testing. - */ -public class MinimalTxSystemClient implements TransactionSystemClient { - private long currentTxPointer = 1; - - @Override - public Transaction startShort() { - long wp = currentTxPointer++; - // NOTE: -1 here is because we have logic that uses (readpointer + 1) as a "exclusive stop key" in some datasets - return new Transaction( - Long.MAX_VALUE - 1, wp, new long[0], new long[0], - Transaction.NO_TX_IN_PROGRESS, TransactionType.SHORT); - } - - @Override - public Transaction startShort(int timeout) { - return startShort(); - } - - @Override - public Transaction startLong() { - return startShort(); - } - - @Override - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) { - return true; - } - - @Override - public boolean commit(Transaction tx) { - return true; - } - - @Override - public void abort(Transaction tx) { - // do nothing - } - - @Override - public boolean invalidate(long tx) { - return true; - } - - @Override - public Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException { - return tx; - } - - @Override - public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { - throw new TransactionCouldNotTakeSnapshotException("Not snapshot to take."); - } - - @Override - public String status() { - return TxConstants.STATUS_OK; - } - - @Override - public void resetState() { - // do nothing - } - - @Override - public boolean truncateInvalidTx(Set<Long> invalidTxIds) { - return true; - } - - @Override - public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { - return true; - } - - @Override - public int getInvalidSize() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java b/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java deleted file mode 100644 index b7a7c59..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/inmemory/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains in memory implementation of the transaction system v2. - */ -package co.cask.tephra.inmemory; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java deleted file mode 100644 index 8e33b4d..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.metrics; - -import co.cask.tephra.TxConstants; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.JmxReporter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.Slf4jReporter; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Default metrics collector implementation using <a href="http://metrics.dropwizard.io">Yammer Metrics</a>. - * - * <p>The reporting frequency for this collector can be configured by setting the - * {@code data.tx.metrics.period} configuration property to the reporting frequency in seconds. - * </p> - */ -public class DefaultMetricsCollector extends TxMetricsCollector { - private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class); - - private final MetricRegistry metrics = new MetricRegistry(); - private JmxReporter jmxReporter; - private ScheduledReporter reporter; - private int reportPeriod; - private ConcurrentMap<String, AtomicLong> gauges = Maps.newConcurrentMap(); - - @Override - public void configure(Configuration conf) { - // initialize selected output reporter - reportPeriod = conf.getInt(TxConstants.Metrics.REPORT_PERIOD_KEY, TxConstants.Metrics.REPORT_PERIOD_DEFAULT); - LOG.info("Configured metrics report to emit every {} seconds", reportPeriod); - // TODO: reporters should be pluggable based on injection - jmxReporter = JmxReporter.forRegistry(metrics).build(); - reporter = Slf4jReporter.forRegistry(metrics) - .outputTo(LoggerFactory.getLogger("tephra-metrics")) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - } - - - @Override - public void gauge(String metricName, int value, String... tags) { - AtomicLong gauge = gauges.get(metricName); - if (gauge == null) { - final AtomicLong newValue = new AtomicLong(); - if (gauges.putIfAbsent(metricName, newValue) == null) { - // first to set the value, need to register the metric - metrics.register(metricName, new Gauge<Long>() { - @Override - public Long getValue() { - return newValue.get(); - } - }); - gauge = newValue; - } else { - // someone else set it first - gauge = gauges.get(metricName); - } - } - gauge.set(value); - } - - @Override - public void histogram(String metricName, int value) { - metrics.histogram(metricName).update(value); - } - - @Override - public void rate(String metricName) { - metrics.meter(metricName).mark(); - } - - @Override - public void rate(String metricName, int count) { - metrics.meter(metricName).mark(count); - } - - @Override - protected void startUp() throws Exception { - jmxReporter.start(); - reporter.start(reportPeriod, TimeUnit.SECONDS); - LOG.info("Started metrics reporter"); - } - - @Override - protected void shutDown() throws Exception { - jmxReporter.stop(); - reporter.stop(); - LOG.info("Stopped metrics reporter"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java deleted file mode 100644 index 3aae4e0..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.metrics; - -import com.google.common.util.concurrent.Service; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; - -/** - * Basic API for Tephra to support system metrics. - */ -public interface MetricsCollector extends Service { - /** - * Report a metric as an absolute value. - */ - void gauge(String metricName, int value, String... tags); - - /** - * Report a metric as a count over a given time duration. This method uses an implicit count of 1. - */ - void rate(String metricName); - - /** - * Report a metric as a count over a given time duration. - */ - void rate(String metricName, int count); - - /** - * Report a metric calculating the distribution of the value. - */ - void histogram(String metricName, int value); - - /** - * Called before the collector service is started, allowing the collector to setup any - * required configuration. - */ - void configure(Configuration conf); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java deleted file mode 100644 index 7fba8e5..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.metrics; - -import com.google.common.util.concurrent.AbstractIdleService; -import org.apache.hadoop.conf.Configuration; - -/** - * Metrics Collector Class, to emit Transaction Related Metrics. - * Note: This default implementation is a no-op and doesn't emit any metrics - */ -public class TxMetricsCollector extends AbstractIdleService implements MetricsCollector { - - @Override - public void gauge(String metricName, int value, String... tags) { - //no-op - } - - @Override - public void rate(String metricName) { - // no-op - } - - @Override - public void rate(String metricName, int count) { - // no-op - } - - @Override - public void histogram(String metricName, int value) { - // no-op - } - - @Override - public void configure(Configuration conf) { - // no-op - } - - /* Service methods */ - - @Override - protected void startUp() throws Exception { - // no-op - } - - @Override - protected void shutDown() throws Exception { - // no-op - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/package-info.java b/tephra-core/src/main/java/co/cask/tephra/package-info.java deleted file mode 100644 index 6f2f858..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains implementations of the transaction system v2. - */ -package co.cask.tephra; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java deleted file mode 100644 index 173cc9f..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.metrics.MetricsCollector; -import com.google.common.collect.Lists; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending - * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}. - */ -public abstract class AbstractTransactionLog implements TransactionLog { - /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */ - private static final long SLOW_APPEND_THRESHOLD = 1000L; - - private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class); - - private final AtomicLong logSequence = new AtomicLong(); - private final MetricsCollector metricsCollector; - protected long timestamp; - private volatile boolean initialized; - private volatile boolean closed; - private AtomicLong syncedUpTo = new AtomicLong(); - private List<Entry> pendingWrites = Lists.newLinkedList(); - private TransactionLogWriter writer; - - public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { - this.timestamp = timestamp; - this.metricsCollector = metricsCollector; - } - - /** - * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they - * also call {@link HDFSTransactionLog#close()}. - * @throws java.io.IOException If an error is encountered initializing the file writer. - */ - public synchronized void init() throws IOException { - if (initialized) { - return; - } - this.writer = createWriter(); - this.initialized = true; - } - - /** - * Returns a log writer to be used for appending any new {@link TransactionEdit} objects. - */ - protected abstract TransactionLogWriter createWriter() throws IOException; - - @Override - public abstract String getName(); - - @Override - public long getTimestamp() { - return timestamp; - } - - @Override - public void append(TransactionEdit edit) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - - // wait for sync to complete - sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } - } - - @Override - public void append(List<TransactionEdit> edits) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - - for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - } - - // wait for sync to complete - sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } - } - - private void ensureAvailable() throws IOException { - if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - } - if (!initialized) { - init(); - } - } - - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. - */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); - } - - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List<Entry> getPendingWrites() { - synchronized (this) { - List<Entry> save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; - } - } - - private void sync() throws IOException { - // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; - long latestSeq = 0; - int entryCount = 0; - synchronized (this) { - if (closed) { - return; - } - // prevent writer being dereferenced - tmpWriter = writer; - - List<Entry> currentPending = getPendingWrites(); - if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - - // write out all accumulated entries to log. - for (Entry e : currentPending) { - tmpWriter.append(e); - entryCount++; - latestSeq = Math.max(latestSeq, e.getKey().get()); - } - } - - long lastSynced = syncedUpTo.get(); - // someone else might have already synced our edits, avoid double syncing - if (lastSynced < latestSeq) { - tmpWriter.sync(); - metricsCollector.histogram("wal.sync.size", entryCount); - syncedUpTo.compareAndSet(lastSynced, latestSeq); - } - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - // perform a final sync if any outstanding writes - if (!pendingWrites.isEmpty()) { - sync(); - } - // NOTE: writer is lazy-inited, so it can be null - if (writer != null) { - this.writer.close(); - } - this.closed = true; - } - - public boolean isClosed() { - return closed; - } - - @Override - public abstract TransactionLogReader getReader() throws IOException; - - /** - * Represents an entry in the transaction log. Each entry consists of a key, generated from an incrementing sequence - * number, and a value, the {@link TransactionEdit} being stored. - */ - public static class Entry implements Writable { - private LongWritable key; - private TransactionEdit edit; - - // for Writable - public Entry() { - this.key = new LongWritable(); - this.edit = new TransactionEdit(); - } - - public Entry(LongWritable key, TransactionEdit edit) { - this.key = key; - this.edit = edit; - } - - public LongWritable getKey() { - return this.key; - } - - public TransactionEdit getEdit() { - return this.edit; - } - - @Override - public void write(DataOutput out) throws IOException { - this.key.write(out); - this.edit.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - this.key.readFields(in); - this.edit.readFields(in); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java deleted file mode 100644 index 682435e..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionStateStorage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import com.google.common.util.concurrent.AbstractIdleService; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Common base class for all transaction storage implementations. This implement logic to prefix a snapshot - * with a version when encoding, and to select the correct codec for decoding based on this version prefix. - */ -public abstract class AbstractTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { - - protected final SnapshotCodecProvider codecProvider; - - protected AbstractTransactionStateStorage(SnapshotCodecProvider codecProvider) { - this.codecProvider = codecProvider; - } - - @Override - public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { - codecProvider.encode(out, snapshot); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java b/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java deleted file mode 100644 index c1796bd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/CommitMarkerCodec.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.Ints; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.SequenceFile; - -import java.io.Closeable; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * Class to read and write commit markers used in {@link HDFSTransactionLogReaderV2} and above. - */ -public class CommitMarkerCodec implements Closeable { - private static final byte[] KEY_BYTES = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED.getBytes(Charsets.UTF_8); - private final DataOutputBuffer rawKey; - private final DataOutputBuffer rawValue; - private SequenceFile.ValueBytes valueBytes; - - public CommitMarkerCodec() { - this.rawKey = new DataOutputBuffer(); - this.rawValue = new DataOutputBuffer(); - } - - @Override - public void close() throws IOException { - rawKey.close(); - rawValue.close(); - } - - // 1. Returns the count when the marker is written correctly - // 2. If data is incorrect (for ex, incorrect key, mismatch in key/value/record length), we throw IOException - // since this indicates corrupted log file - // 3. If data is incomplete, then we throw EOFException which is handled gracefully by the calling method - // since we can recover without any consequence - public int readMarker(SequenceFile.Reader reader) throws IOException { - if (valueBytes == null) { - valueBytes = reader.createValueBytes(); - } - rawKey.reset(); - rawValue.reset(); - - // valueBytes need not be reset since nextRaw call does it (and it is a private method) - int status = reader.nextRaw(rawKey, valueBytes); - - // if we reach EOF, return -1 - if (status == -1) { - return -1; - } - - // Check if the marker key is valid and return the count - if (isMarkerValid()) { - valueBytes.writeUncompressedBytes(rawValue); - rawValue.flush(); - // rawValue.getData() may return a larger byte array but Ints.fromByteArray will only read the first four bytes - return Ints.fromByteArray(rawValue.getData()); - } - - // EOF not reached and marker is not valid, then thrown an IOException since we can't make progress - throw new IOException(String.format("Invalid key for num entries appended found %s, expected : %s", - new String(rawKey.getData()), TxConstants.TransactionLog.NUM_ENTRIES_APPENDED)); - } - - private boolean isMarkerValid() { - // rawKey should have the expected length and the matching bytes should start at index 0 - return rawKey.getLength() == KEY_BYTES.length && Bytes.indexOf(rawKey.getData(), KEY_BYTES) == 0; - } - - public static void writeMarker(SequenceFile.Writer writer, int count) throws IOException { - writer.appendRaw(KEY_BYTES, 0, KEY_BYTES.length, new CommitEntriesCount(count)); - } - - @VisibleForTesting - static final class CommitEntriesCount implements SequenceFile.ValueBytes { - private final int numEntries; - - public CommitEntriesCount(int numEntries) { - this.numEntries = numEntries; - } - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { - outStream.write(Ints.toByteArray(numEntries)); - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { - throw new IllegalArgumentException("Commit Entries count writing is not expected to be compressed."); - } - - @Override - public int getSize() { - return Ints.BYTES; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java deleted file mode 100644 index bed90c2..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.MetricsCollector; -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; - -/** - * Allows reading from and writing to a transaction write-ahead log stored in HDFS. - */ -public class HDFSTransactionLog extends AbstractTransactionLog { - private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLog.class); - - private final FileSystem fs; - private final Configuration hConf; - private final Path logPath; - - /** - * Creates a new HDFS-backed write-ahead log for storing transaction state. - * @param fs Open FileSystem instance for opening log files in HDFS. - * @param hConf HDFS cluster configuration. - * @param logPath Path to the log file. - */ - public HDFSTransactionLog(final FileSystem fs, final Configuration hConf, - final Path logPath, long timestamp, MetricsCollector metricsCollector) { - super(timestamp, metricsCollector); - this.fs = fs; - this.hConf = hConf; - this.logPath = logPath; - } - - @Override - protected TransactionLogWriter createWriter() throws IOException { - return new LogWriter(fs, hConf, logPath); - } - - @Override - public String getName() { - return logPath.getName(); - } - - @Override - public TransactionLogReader getReader() throws IOException { - FileStatus status = fs.getFileStatus(logPath); - long length = status.getLen(); - - TransactionLogReader reader = null; - // check if this file needs to be recovered due to failure - // Check for possibly empty file. With appends, currently Hadoop reports a - // zero length even if the file has been sync'd. Revisit if HDFS-376 or - // HDFS-878 is committed. - if (length <= 0) { - LOG.warn("File " + logPath + " might be still open, length is 0"); - } - - try { - HDFSUtil hdfsUtil = new HDFSUtil(); - hdfsUtil.recoverFileLease(fs, logPath, hConf); - try { - FileStatus newStatus = fs.getFileStatus(logPath); - LOG.info("New file size for " + logPath + " is " + newStatus.getLen()); - SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf); - reader = new HDFSTransactionLogReaderSupplier(fileReader).get(); - } catch (EOFException e) { - if (length <= 0) { - // TODO should we ignore an empty, not-last log file if skip.errors - // is false? Either way, the caller should decide what to do. E.g. - // ignore if this is the last log in sequence. - // TODO is this scenario still possible if the log has been - // recovered (i.e. closed) - LOG.warn("Could not open " + logPath + " for reading. File is empty", e); - return null; - } else { - // EOFException being ignored - return null; - } - } - } catch (IOException e) { - throw e; - } - return reader; - } - - @VisibleForTesting - static final class LogWriter implements TransactionLogWriter { - private final SequenceFile.Writer internalWriter; - public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException { - // TODO: retry a few times to ride over transient failures? - SequenceFile.Metadata metadata = new SequenceFile.Metadata(); - metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), - new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION))); - - this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class, - SequenceFile.CompressionType.NONE, null, null, metadata); - LOG.debug("Created a new TransactionLog writer for " + logPath); - } - - @Override - public void append(Entry entry) throws IOException { - internalWriter.append(entry.getKey(), entry.getEdit()); - } - - @Override - public void commitMarker(int count) throws IOException { - CommitMarkerCodec.writeMarker(internalWriter, count); - } - - @Override - public void sync() throws IOException { - internalWriter.syncFs(); - } - - @Override - public void close() throws IOException { - internalWriter.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java deleted file mode 100644 index c407945..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderSupplier.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import com.google.common.base.Supplier; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; - -/** - * Provides the correct version of {@link TransactionLogReader}, based on the log's version metadata, - * to read HDFS Transaction Logs. - */ -public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLogReader> { - private final SequenceFile.Reader reader; - private final byte version; - private TransactionLogReader logReader; - - public HDFSTransactionLogReaderSupplier(SequenceFile.Reader reader) { - this.reader = reader; - Text versionInfo = reader.getMetadata().get(new Text(TxConstants.TransactionLog.VERSION_KEY)); - this.version = versionInfo == null ? 1 : Byte.parseByte(versionInfo.toString()); - } - - @Override - public TransactionLogReader get() { - if (logReader != null) { - return logReader; - } - - switch (version) { - case 2: - logReader = new HDFSTransactionLogReaderV2(reader); - return logReader; - case 1: - logReader = new HDFSTransactionLogReaderV1(reader); - return logReader; - default: - throw new IllegalArgumentException(String.format("Invalid version %s found in the Transaction Log", version)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java deleted file mode 100644 index cb2ce7c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV1.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; - -/** - * {@link TransactionLogReader} that can read v1 (default) version of Transaction logs. The logs are expected to - * have a sequence of {@link TransactionEdit}s. - */ -public class HDFSTransactionLogReaderV1 implements TransactionLogReader { - private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV1.class); - private final SequenceFile.Reader reader; - private final LongWritable key; - private boolean closed; - - public HDFSTransactionLogReaderV1(SequenceFile.Reader reader) { - this.reader = reader; - this.key = new LongWritable(); - } - - @Override - public TransactionEdit next() throws IOException { - return next(new TransactionEdit()); - } - - @Override - public TransactionEdit next(TransactionEdit reuse) throws IOException { - if (closed) { - return null; - } - - try { - boolean successful = reader.next(key, reuse); - return successful ? reuse : null; - } catch (EOFException e) { - LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e); - return null; - } - } - - @Override - public void close() throws IOException { - if (closed) { - return; - } - reader.close(); - closed = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java deleted file mode 100644 index 6981a3b..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLogReaderV2.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; - -/** - * {@link TransactionLogReader} that can read v2 version of Transaction logs. The logs are expected to - * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker), - * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of - * {@link TransactionEdit}s are discarded. - */ -public class HDFSTransactionLogReaderV2 implements TransactionLogReader { - private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class); - - private final SequenceFile.Reader reader; - private final Queue<TransactionEdit> transactionEdits; - private final CommitMarkerCodec commitMarkerCodec; - private final LongWritable key; - - private boolean closed; - - public HDFSTransactionLogReaderV2(SequenceFile.Reader reader) { - this.reader = reader; - this.transactionEdits = new ArrayDeque<>(); - this.key = new LongWritable(); - this.commitMarkerCodec = new CommitMarkerCodec(); - } - - @Override - public void close() throws IOException { - if (closed) { - return; - } - try { - commitMarkerCodec.close(); - } finally { - reader.close(); - closed = true; - } - } - - @Override - public TransactionEdit next() throws IOException { - return next(null); - } - - @Override - public TransactionEdit next(TransactionEdit reuse) throws IOException { - if (closed) { - return null; - } - - if (!transactionEdits.isEmpty()) { - return transactionEdits.remove(); - } - - // Fetch the 'marker' and read 'marker' number of edits - populateTransactionEdits(); - return transactionEdits.poll(); - } - - private void populateTransactionEdits() throws IOException { - // read the marker to determine numEntries to read. - int numEntries = 0; - try { - // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely - // ignore this - numEntries = commitMarkerCodec.readMarker(reader); - } catch (EOFException e) { - LOG.warn("Reached EOF in log while trying to read commit marker", e); - } - - for (int i = 0; i < numEntries; i++) { - TransactionEdit edit = new TransactionEdit(); - try { - if (reader.next(key, edit)) { - transactionEdits.add(edit); - } else { - throw new EOFException("Attempt to read TransactionEdit failed."); - } - } catch (EOFException e) { - // we have reached EOF before reading back numEntries, we clear the partial list and return. - LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker", - numEntries, transactionEdits.size(), e); - transactionEdits.clear(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java deleted file mode 100644 index bc7fb9f..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.CountingInputStream; -import com.google.common.primitives.Longs; -import com.google.inject.Inject; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import javax.annotation.Nullable; - -/** - * Handles persistence of transaction snapshot and logs to a directory in HDFS. - * - * The directory used for file storage is configured using the {@code data.tx.snapshot.dir} configuration property. - * Both snapshot and transaction log files are suffixed with a timestamp to allow easy ordering. Snapshot files - * are written with the filename "snapshot.<timestamp>". Transaction log files are written with the filename - * "txlog.<timestamp>". - */ -public class HDFSTransactionStateStorage extends AbstractTransactionStateStorage { - private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionStateStorage.class); - - private static final String SNAPSHOT_FILE_PREFIX = "snapshot."; - private static final String TMP_SNAPSHOT_FILE_PREFIX = ".in-progress.snapshot."; - private static final String LOG_FILE_PREFIX = "txlog."; - - private static final PathFilter SNAPSHOT_FILE_FILTER = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().startsWith(SNAPSHOT_FILE_PREFIX); - } - }; - - // buffer size used for HDFS reads and writes - private static final int BUFFER_SIZE = 16384; - - private final Configuration hConf; - private final String configuredSnapshotDir; - private final MetricsCollector metricsCollector; - private FileSystem fs; - private Path snapshotDir; - - @Inject - public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider, - MetricsCollector metricsCollector) { - super(codecProvider); - this.hConf = hConf; - this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); - this.metricsCollector = metricsCollector; - } - - @Override - protected void startUp() throws Exception { - Preconditions.checkState(configuredSnapshotDir != null, - "Snapshot directory is not configured. Please set " + TxConstants.Manager.CFG_TX_SNAPSHOT_DIR + - " in configuration."); - String hdfsUser = hConf.get(TxConstants.Manager.CFG_TX_HDFS_USER); - if (hdfsUser == null || UserGroupInformation.isSecurityEnabled()) { - if (hdfsUser != null && LOG.isDebugEnabled()) { - LOG.debug("Ignoring configuration {}={}, running on secure Hadoop", - TxConstants.Manager.CFG_TX_HDFS_USER, hdfsUser); - } - // NOTE: we can start multiple times this storage. As hdfs uses per-jvm cache, we want to create new fs instead - // of getting closed one - fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf); - } else { - fs = FileSystem.newInstance(FileSystem.getDefaultUri(hConf), hConf, hdfsUser); - } - snapshotDir = new Path(configuredSnapshotDir); - LOG.info("Using snapshot dir " + snapshotDir); - } - - @Override - protected void shutDown() throws Exception { - fs.close(); - } - - @Override - public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { - // create a temporary file, and save the snapshot - Path snapshotTmpFile = new Path(snapshotDir, TMP_SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); - LOG.debug("Writing snapshot to temporary file {}", snapshotTmpFile); - - FSDataOutputStream out = fs.create(snapshotTmpFile, false, BUFFER_SIZE); - // encode the snapshot and stream the serialized version to the file - try { - codecProvider.encode(out, snapshot); - } finally { - out.close(); - } - - // move the temporary file into place with the correct filename - Path finalFile = new Path(snapshotDir, SNAPSHOT_FILE_PREFIX + snapshot.getTimestamp()); - fs.rename(snapshotTmpFile, finalFile); - LOG.debug("Completed snapshot to file {}", finalFile); - } - - @Override - public TransactionSnapshot getLatestSnapshot() throws IOException { - InputStream in = getLatestSnapshotInputStream(); - if (in == null) { - return null; - } - try { - return readSnapshotInputStream(in); - } finally { - in.close(); - } - } - - @Override - public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { - InputStream in = getLatestSnapshotInputStream(); - if (in == null) { - return null; - } - try { - return readTransactionVisibilityStateFromInputStream(in); - } finally { - in.close(); - } - } - - private InputStream getLatestSnapshotInputStream() throws IOException { - TimestampedFilename[] snapshots = listSnapshotFiles(); - Arrays.sort(snapshots); - if (snapshots.length > 0) { - // last is the most recent - return fs.open(snapshots[snapshots.length - 1].getPath(), BUFFER_SIZE); - } - - LOG.info("No snapshot files found in {}", snapshotDir); - return null; - } - - private TransactionSnapshot readSnapshotInputStream(InputStream in) throws IOException { - CountingInputStream countingIn = new CountingInputStream(in); - TransactionSnapshot snapshot = codecProvider.decode(countingIn); - LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount()); - return snapshot; - } - - private TransactionVisibilityState readTransactionVisibilityStateFromInputStream(InputStream in) throws IOException { - CountingInputStream countingIn = new CountingInputStream(in); - TransactionVisibilityState state = codecProvider.decodeTransactionVisibilityState(countingIn); - LOG.info("Read encoded transaction snapshot of {} bytes", countingIn.getCount()); - return state; - } - - private TransactionSnapshot readSnapshotFile(Path filePath) throws IOException { - FSDataInputStream in = fs.open(filePath, BUFFER_SIZE); - try { - return readSnapshotInputStream(in); - } finally { - in.close(); - } - } - - private TimestampedFilename[] listSnapshotFiles() throws IOException { - FileStatus[] snapshotFileStatuses = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER); - TimestampedFilename[] snapshotFiles = new TimestampedFilename[snapshotFileStatuses.length]; - for (int i = 0; i < snapshotFileStatuses.length; i++) { - snapshotFiles[i] = new TimestampedFilename(snapshotFileStatuses[i].getPath()); - } - return snapshotFiles; - } - - @Override - public long deleteOldSnapshots(int numberToKeep) throws IOException { - TimestampedFilename[] snapshots = listSnapshotFiles(); - if (snapshots.length == 0) { - return -1; - } - Arrays.sort(snapshots, Collections.reverseOrder()); - if (snapshots.length <= numberToKeep) { - // nothing to remove, oldest timestamp is the last snapshot - return snapshots[snapshots.length - 1].getTimestamp(); - } - int toRemoveCount = snapshots.length - numberToKeep; - TimestampedFilename[] toRemove = new TimestampedFilename[toRemoveCount]; - System.arraycopy(snapshots, numberToKeep, toRemove, 0, toRemoveCount); - - for (TimestampedFilename f : toRemove) { - LOG.debug("Removing old snapshot file {}", f.getPath()); - fs.delete(f.getPath(), false); - } - long oldestTimestamp = snapshots[numberToKeep - 1].getTimestamp(); - LOG.debug("Removed {} old snapshot files prior to {}", toRemoveCount, oldestTimestamp); - return oldestTimestamp; - } - - @Override - public List<String> listSnapshots() throws IOException { - FileStatus[] files = fs.listStatus(snapshotDir, SNAPSHOT_FILE_FILTER); - return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() { - @Nullable - @Override - public String apply(@Nullable FileStatus input) { - return input.getPath().getName(); - } - }); - } - - @Override - public List<TransactionLog> getLogsSince(long timestamp) throws IOException { - FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(timestamp, Long.MAX_VALUE)); - TimestampedFilename[] timestampedFiles = new TimestampedFilename[statuses.length]; - for (int i = 0; i < statuses.length; i++) { - timestampedFiles[i] = new TimestampedFilename(statuses[i].getPath()); - } - return Lists.transform(Arrays.asList(timestampedFiles), new Function<TimestampedFilename, TransactionLog>() { - @Nullable - @Override - public TransactionLog apply(@Nullable TimestampedFilename input) { - return openLog(input.getPath(), input.getTimestamp()); - } - }); - } - - @Override - public TransactionLog createLog(long timestamp) throws IOException { - Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp); - return openLog(newLog, timestamp); - } - - private TransactionLog openLog(Path path, long timestamp) { - return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector); - } - - @Override - public void deleteLogsOlderThan(long timestamp) throws IOException { - FileStatus[] statuses = fs.listStatus(snapshotDir, new LogFileFilter(0, timestamp)); - int removedCnt = 0; - for (FileStatus status : statuses) { - LOG.debug("Removing old transaction log {}", status.getPath()); - if (fs.delete(status.getPath(), false)) { - removedCnt++; - } else { - LOG.error("Failed to delete transaction log file {}", status.getPath()); - } - } - LOG.debug("Removed {} transaction logs older than {}", removedCnt, timestamp); - } - - @Override - public void setupStorage() throws IOException { - if (!fs.exists(snapshotDir)) { - LOG.info("Creating snapshot dir at {}", snapshotDir); - fs.mkdirs(snapshotDir); - } else { - Preconditions.checkState(fs.isDirectory(snapshotDir), - "Configured snapshot directory " + snapshotDir + " is not a directory!"); - } - } - - @Override - public List<String> listLogs() throws IOException { - FileStatus[] files = fs.listStatus(snapshotDir, new LogFileFilter(0, Long.MAX_VALUE)); - return Lists.transform(Arrays.asList(files), new Function<FileStatus, String>() { - @Nullable - @Override - public String apply(@Nullable FileStatus input) { - return input.getPath().getName(); - } - }); - } - - @Override - public String getLocation() { - return snapshotDir.toString(); - } - - private static class LogFileFilter implements PathFilter { - // starting time of files to include (inclusive) - private final long startTime; - // ending time of files to include (exclusive) - private final long endTime; - - public LogFileFilter(long startTime, long endTime) { - this.startTime = startTime; - this.endTime = endTime; - } - - @Override - public boolean accept(Path path) { - if (path.getName().startsWith(LOG_FILE_PREFIX)) { - String[] parts = path.getName().split("\\."); - if (parts.length == 2) { - try { - long fileTime = Long.parseLong(parts[1]); - return fileTime >= startTime && fileTime < endTime; - } catch (NumberFormatException ignored) { - LOG.warn("Filename {} did not match the expected pattern prefix.<timestamp>", path.getName()); - } - } - } - return false; - } - } - - /** - * Represents a filename composed of a prefix and a ".timestamp" suffix. This is useful for manipulating both - * snapshot and transaction log filenames. - */ - private static class TimestampedFilename implements Comparable<TimestampedFilename> { - private Path path; - private String prefix; - private long timestamp; - - public TimestampedFilename(Path path) { - this.path = path; - String[] parts = path.getName().split("\\."); - if (parts.length != 2) { - throw new IllegalArgumentException("Filename " + path.getName() + - " did not match the expected pattern prefix.timestamp"); - } - prefix = parts[0]; - timestamp = Long.parseLong(parts[1]); - } - - public Path getPath() { - return path; - } - - public String getPrefix() { - return prefix; - } - - public long getTimestamp() { - return timestamp; - } - - @Override - public int compareTo(TimestampedFilename other) { - int res = prefix.compareTo(other.getPrefix()); - if (res == 0) { - res = Longs.compare(timestamp, other.getTimestamp()); - } - return res; - } - } - - // TODO move this out as a separate command line tool - private enum CLIMode { SNAPSHOT, TXLOG }; - /** - * Reads a transaction state snapshot or transaction log from HDFS and prints the entries to stdout. - * - * Supports the following options: - * -s read snapshot state (defaults to the latest) - * -l read a transaction log - * [filename] reads the given file - * @param args - */ - public static void main(String[] args) { - List<String> filenames = Lists.newArrayList(); - CLIMode mode = null; - for (String arg : args) { - if ("-s".equals(arg)) { - mode = CLIMode.SNAPSHOT; - } else if ("-l".equals(arg)) { - mode = CLIMode.TXLOG; - } else if ("-h".equals(arg)) { - printUsage(null); - } else { - filenames.add(arg); - } - } - - if (mode == null) { - printUsage("ERROR: Either -s or -l is required to set mode.", 1); - } - - Configuration config = new ConfigurationFactory().get(); - - // Use the no-op metrics collector. We are being run as a command line tool, so there are no relevant metrics - // to report - HDFSTransactionStateStorage storage = - new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()); - storage.startAndWait(); - try { - switch (mode) { - case SNAPSHOT: - try { - if (filenames.isEmpty()) { - TransactionSnapshot snapshot = storage.getLatestSnapshot(); - printSnapshot(snapshot); - } - for (String file : filenames) { - Path path = new Path(file); - TransactionSnapshot snapshot = storage.readSnapshotFile(path); - printSnapshot(snapshot); - System.out.println(); - } - } catch (IOException ioe) { - System.err.println("Error reading snapshot files: " + ioe.getMessage()); - ioe.printStackTrace(); - System.exit(1); - } - break; - case TXLOG: - if (filenames.isEmpty()) { - printUsage("ERROR: At least one transaction log filename is required!", 1); - } - for (String file : filenames) { - TimestampedFilename timestampedFilename = new TimestampedFilename(new Path(file)); - TransactionLog log = storage.openLog(timestampedFilename.getPath(), timestampedFilename.getTimestamp()); - printLog(log); - System.out.println(); - } - break; - } - } finally { - storage.stop(); - } - } - - private static void printUsage(String message) { - printUsage(message, 0); - } - - private static void printUsage(String message, int exitCode) { - if (message != null) { - System.out.println(message); - } - System.out.println("Usage: java " + HDFSTransactionStateStorage.class.getName() + " (-s|-l) file1 [file2...]"); - System.out.println(); - System.out.println("\t-s\tRead files as transaction state snapshots (will default to latest if no file given)"); - System.out.println("\t-l\tRead files as transaction logs [filename is required]"); - System.out.println("\t-h\tPrint this message"); - System.exit(exitCode); - } - - private static void printSnapshot(TransactionSnapshot snapshot) { - Date snapshotDate = new Date(snapshot.getTimestamp()); - System.out.println("TransactionSnapshot at " + snapshotDate.toString()); - System.out.println("\t" + snapshot.toString()); - } - - private static void printLog(TransactionLog log) { - try { - System.out.println("TransactionLog " + log.getName()); - TransactionLogReader reader = log.getReader(); - TransactionEdit edit; - long seq = 0; - while ((edit = reader.next()) != null) { - System.out.println(String.format(" %d: %s", seq++, edit.toString())); - } - } catch (IOException ioe) { - System.err.println("ERROR reading log " + log.getName() + ": " + ioe.getMessage()); - ioe.printStackTrace(); - } - } -}
