http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java new file mode 100644 index 0000000..ebbbd18 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import com.google.inject.Provider; +import com.google.inject.ProvisionException; +import org.apache.twill.internal.utils.Instances; + +/** + * Common class factory behavior for classes which need specific implementations depending on HBase versions. + * Specific factories can subclass this class and simply plug in the class names for their implementations. + * + * @param <T> Version specific class provided by this factory. + */ +public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> { + @Override + public T get() { + T instance = null; + try { + switch (HBaseVersion.get()) { + case HBASE_94: + throw new ProvisionException("HBase 0.94 is no longer supported. Please upgrade to HBase 0.96 or newer."); + case HBASE_96: + instance = createInstance(getHBase96Classname()); + break; + case HBASE_98: + instance = createInstance(getHBase98Classname()); + break; + case HBASE_10: + instance = createInstance(getHBase10Classname()); + break; + case HBASE_10_CDH: + instance = createInstance(getHBase10CDHClassname()); + break; + case HBASE_11: + case HBASE_12_CDH: + instance = createInstance(getHBase11Classname()); + break; + case UNKNOWN: + throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString()); + } + } catch (ClassNotFoundException cnfe) { + throw new ProvisionException(cnfe.getMessage(), cnfe); + } + return instance; + } + + protected T createInstance(String className) throws ClassNotFoundException { + Class clz = Class.forName(className); + return (T) Instances.newInstance(clz); + } + + protected abstract String getHBase96Classname(); + protected abstract String getHBase98Classname(); + protected abstract String getHBase10Classname(); + protected abstract String getHBase10CDHClassname(); + protected abstract String getHBase11Classname(); +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java new file mode 100644 index 0000000..08b1545 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import com.google.common.primitives.Longs; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.persist.TransactionVisibilityState; + +import java.util.Map; + +/** + * Utility methods supporting transaction operations. + */ +public class TxUtils { + + // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data, + // i.e. data written before table was converted into transactional table using Tephra. + // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does + // not produce any false positives or false negatives. + // + // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest + // that will be written is in year 2200. + // 01-Jan-2000 GMT is 946684800000 milliseconds since epoch. + // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch. + // + // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using + // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is + // non-transactional (<= 946684800000). + // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant. + // + // On the other end, let's say we enabled transactions on a table on 31-Dec-2200, + // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a + // cell timestamp is transactional (<= 7289654399000). + // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant. + private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1); + + /** + * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column + * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}. + * @param ttlByFamily A map of column family name to TTL value (in milliseconds) + * @param tx The current transaction + * @return The oldest timestamp that will be visible for the given transaction and TTL configuration + */ + public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) { + long maxTTL = getMaxTTL(ttlByFamily); + // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it + return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0; + } + + /** + * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column + * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}. + * @param ttlByFamily A map of column family name to TTL value (in milliseconds) + * @param tx The current transaction + * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data + * @return The oldest timestamp that will be visible for the given transaction and TTL configuration + */ + public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) { + if (readNonTxnData) { + long maxTTL = getMaxTTL(ttlByFamily); + return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0; + } + + return getOldestVisibleTimestamp(ttlByFamily, tx); + } + + /** + * Returns the maximum timestamp to use for time-range operations, based on the given transaction. + * @param tx The current transaction + * @return The maximum timestamp (exclusive) to use for time-range operations + */ + public static long getMaxVisibleTimestamp(Transaction tx) { + // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive + // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE + return tx.getWritePointer() < Long.MAX_VALUE ? + tx.getWritePointer() + 1 : tx.getWritePointer(); + } + + /** + * Creates a "dummy" transaction based on the given txVisibilityState's state. This is not a "real" transaction in + * the sense that it has not been started, data should not be written with it, and it cannot be committed. However, + * this can still be useful for filtering data according to the txVisibilityState's state. Instead of the actual + * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells + * as being written by this transaction (and therefore visible). + */ + public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) { + return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE, + Longs.toArray(txVisibilityState.getInvalid()), + Longs.toArray(txVisibilityState.getInProgress().keySet()), + TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT); + } + + /** + * Returns the write pointer for the first "short" transaction that in the in-progress set, or + * {@link Transaction#NO_TX_IN_PROGRESS} if none. + */ + public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) { + long firstShort = Transaction.NO_TX_IN_PROGRESS; + for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { + if (!entry.getValue().isLongRunning()) { + firstShort = entry.getKey(); + break; + } + } + return firstShort; + } + + /** + * Returns the timestamp for calculating time to live for the given cell timestamp. + * This takes into account pre-existing non-transactional cells while calculating the time. + */ + public static long getTimestampForTTL(long cellTs) { + return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs; + } + + /** + * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set. + */ + private static long getMaxTTL(Map<byte[], Long> ttlByFamily) { + long maxTTL = 0; + for (Long familyTTL : ttlByFamily.values()) { + maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL); + } + return maxTTL == 0 ? Long.MAX_VALUE : maxTTL; + } + + /** + * Returns true if version was written before table was converted into transactional table, false otherwise. + */ + public static boolean isPreExistingVersion(long version) { + return version < MAX_NON_TX_TIMESTAMP; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java new file mode 100644 index 0000000..7c1af8e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import com.google.common.base.Stopwatch; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Default implementation of {@link FenceWait}. + */ +public class DefaultFenceWait implements FenceWait { + private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class); + + private final TransactionContext txContext; + + DefaultFenceWait(TransactionContext txContext) { + this.txContext = txContext; + } + + @Override + public void await(long timeout, TimeUnit timeUnit) + throws TransactionFailureException, InterruptedException, TimeoutException { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + long sleepTimeMicros = timeUnit.toMicros(timeout) / 10; + // Have sleep time to be within 1 microsecond and 500 milliseconds + sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1); + while (stopwatch.elapsedTime(timeUnit) < timeout) { + txContext.start(); + try { + txContext.finish(); + return; + } catch (TransactionFailureException e) { + LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e); + txContext.abort(); + TimeUnit.MICROSECONDS.sleep(sleepTimeMicros); + } + } + throw new TimeoutException("Timeout waiting for fence"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java new file mode 100644 index 0000000..e1fb246 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import org.apache.tephra.TransactionFailureException; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. + */ +public interface FenceWait { + /** + * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried + * several times until the timeout. + * <p> + * + * If a fence wait times out then it means there are still some readers with in-progress transactions that have not + * seen the change. In this case the wait will have to be retried using the same FenceWait object. + * + * @param timeout Maximum time to wait + * @param timeUnit {@link TimeUnit} for timeout and sleepTime + * @throws TransactionFailureException when not able to start fence wait transaction + * @throws InterruptedException on any interrupt + * @throws TimeoutException when timeout is reached + */ + void await(long timeout, TimeUnit timeUnit) + throws TransactionFailureException, InterruptedException, TimeoutException; +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java new file mode 100644 index 0000000..a156d55 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; + +import java.util.Collection; +import java.util.Collections; + +/** + * Implementation of {@link VisibilityFence} used by reader. + */ +class ReadFence implements TransactionAware { + private final byte[] fenceId; + private Transaction tx; + + public ReadFence(byte[] fenceId) { + this.fenceId = fenceId; + } + + @Override + public void startTx(Transaction tx) { + this.tx = tx; + } + + @Override + public void updateTx(Transaction tx) { + // Fences only need original transaction + } + + @Override + public Collection<byte[]> getTxChanges() { + if (tx == null) { + throw new IllegalStateException("Transaction has not started yet"); + } + return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId()))); + } + + @Override + public boolean commitTx() throws Exception { + // Nothing to persist + return true; + } + + @Override + public void postTxCommit() { + tx = null; + } + + @Override + public boolean rollbackTx() throws Exception { + // Nothing to rollback + return true; + } + + @Override + public String getTransactionAwareName() { + return getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java new file mode 100644 index 0000000..5d08246 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionSystemClient; + +import java.util.concurrent.TimeoutException; + +/** + * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change + * that got committed. + * <p> + * + * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its + * transaction started. However to ensure that after a given point all readers are aware of a change, + * we have to introduce a conflict between a reader and a writer that act on the same data concurrently. + *<p> + * + * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence + * in its transaction. If there are no changes to the data when reader tries to commit the transaction + * containing the fence, the commit succeeds. + * <p> + * + * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed, + * the writer then waits on the fence to ensure that all in-progress readers are aware of this update. + * When the wait on the fence returns successfully, it means that + * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will + * force the readers to start a new transaction, and this ensures that the changes made by writer are visible + * to the readers. + *<p> + * + * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry + * until the given timeout. + * <p> + * + * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after + * this point onwards would see the change. + * + * <p> + * Sample reader code: + * <pre> + * <code> + * TransactionAware fence = VisibilityFence.create(fenceId); + * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...); + * readTxContext.start(); + * + * // do operations using table1, table2, etc. + * + * // finally commit + * try { + * readTxContext.finish(); + * } catch (TransactionConflictException e) { + * // handle conflict by aborting and starting over with a new transaction + * } + * </code> + * </pre> + *<p> + * + * Sample writer code: + * <pre> + * <code> + * // start transaction + * // write change + * // commit transaction + * + * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are + * aware of this change + * try { + * FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient); + * fenceWait.await(50000, 50, TimeUnit.MILLISECONDS); + * } catch (TimeoutException e) { + * // await timed out, the change may not be visible to all in-progress readers. + * // Application has two options at this point: + * // 1. Revert the write. Re-try the write and fence wait again. + * // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here). + * } + * </code> + * </pre> + * + * fenceId in the above samples refers to any id that both the readers and writer know for a given + * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data. + * Typically fenceId uniquely identifies the data in question. + * For example, if the data is a table row, the fenceId can be composed of table name and row key. + * If the data is a table cell, the fenceId can be composed of table name, row key, and column key. + *<p> + * + * Note that in this implementation, any reader that starts a transaction after the write is committed, and + * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then + * this reader will get a conflict while committing the fence even though this reader has seen the latest changes. + * This is because today there is no way to determine the commit time of a transaction. + */ +public final class VisibilityFence { + private VisibilityFence() { + // Cannot instantiate this class, all functionality is through static methods. + } + + /** + * Used by a reader to get a fence that can be added to its transaction context. + * + * @param fenceId uniquely identifies the data that this fence is used to synchronize. + * If the data is a table cell then this id can be composed of the table name, row key + * and column key for the data. + * @return {@link TransactionAware} to be added to reader's transaction context. + */ + public static TransactionAware create(byte[] fenceId) { + return new ReadFence(fenceId); + } + + /** + * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. + * + * @param fenceId uniquely identifies the data that this fence is used to synchronize. + * If the data is a table cell then this id can be composed of the table name, row key + * and column key for the data. + * @return {@link FenceWait} object + */ + public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient) + throws TransactionFailureException, InterruptedException, TimeoutException { + return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java new file mode 100644 index 0000000..fe59efe --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedBytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; + +import java.util.Collection; +import java.util.TreeSet; + +/** + * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}. + */ +class WriteFence implements TransactionAware { + private final byte[] fenceId; + private Transaction tx; + private Collection<byte[]> inProgressChanges; + + public WriteFence(byte[] fenceId) { + this.fenceId = fenceId; + } + + @Override + public void startTx(Transaction tx) { + this.tx = tx; + if (inProgressChanges == null) { + inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator()); + for (long inProgressTx : tx.getInProgress()) { + inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx))); + } + } + } + + @Override + public void updateTx(Transaction tx) { + // Fences only need original transaction + } + + @Override + public Collection<byte[]> getTxChanges() { + if (inProgressChanges == null || tx == null) { + throw new IllegalStateException("Transaction has not started yet"); + } + return inProgressChanges; + } + + @Override + public boolean commitTx() throws Exception { + // Nothing to persist + return true; + } + + @Override + public void postTxCommit() { + tx = null; + } + + @Override + public boolean rollbackTx() throws Exception { + // Nothing to rollback + return true; + } + + @Override + public String getTransactionAwareName() { + return getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java new file mode 100644 index 0000000..8d95a42 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.zookeeper; + +import org.apache.twill.zookeeper.ACLData; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.util.List; + +/** + * A straightforward implementation of {@link ACLData}. + */ +final class BasicACLData implements ACLData { + + private final List<ACL> acl; + private final Stat stat; + + BasicACLData(List<ACL> acl, Stat stat) { + this.acl = acl; + this.stat = stat; + } + + @Override + public List<ACL> getACL() { + return acl; + } + + @Override + public Stat getStat() { + return stat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java new file mode 100644 index 0000000..80edddb --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.zookeeper; + +import com.google.common.base.Objects; +import org.apache.twill.zookeeper.NodeChildren; +import org.apache.zookeeper.data.Stat; + +import java.util.List; + +/** + * Implementation of the {@link NodeChildren}. + */ +final class BasicNodeChildren implements NodeChildren { + + private final Stat stat; + private final List<String> children; + + BasicNodeChildren(List<String> children, Stat stat) { + this.stat = stat; + this.children = children; + } + + @Override + public Stat getStat() { + return stat; + } + + @Override + public List<String> getChildren() { + return children; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof NodeChildren)) { + return false; + } + + NodeChildren that = (NodeChildren) o; + return stat.equals(that.getStat()) && children.equals(that.getChildren()); + } + + @Override + public int hashCode() { + return Objects.hashCode(children, stat); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java new file mode 100644 index 0000000..5df3475 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.zookeeper; + +import com.google.common.base.Objects; +import org.apache.twill.zookeeper.NodeData; +import org.apache.zookeeper.data.Stat; + +import java.util.Arrays; + +/** + * A straightforward implementation for {@link NodeData}. + */ +final class BasicNodeData implements NodeData { + + private final byte[] data; + private final Stat stat; + + BasicNodeData(byte[] data, Stat stat) { + this.data = data; + this.stat = stat; + } + + @Override + public Stat getStat() { + return stat; + } + + @Override + public byte[] getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof NodeData)) { + return false; + } + + BasicNodeData that = (BasicNodeData) o; + + return stat.equals(that.getStat()) && Arrays.equals(data, that.getData()); + } + + @Override + public int hashCode() { + return Objects.hashCode(data, stat); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java new file mode 100644 index 0000000..2bc7a6a --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.zookeeper; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.twill.common.Cancellable; +import org.apache.twill.common.Threads; +import org.apache.twill.internal.zookeeper.SettableOperationFuture; +import org.apache.twill.zookeeper.ACLData; +import org.apache.twill.zookeeper.NodeChildren; +import org.apache.twill.zookeeper.NodeData; +import org.apache.twill.zookeeper.OperationFuture; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** + * The implementation of {@link ZKClientService}. + */ +public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher { + + private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class); + + private final String zkStr; + private final int sessionTimeout; + private final List<Watcher> connectionWatchers; + private final Multimap<String, byte[]> authInfos; + private final AtomicReference<ZooKeeper> zooKeeper; + private final Runnable stopTask; + private ExecutorService eventExecutor; + + /** + * Create a new instance. + * @param zkStr zookeper connection string + * @param sessionTimeout timeout in milliseconds + * @param connectionWatcher watcher to set + * @param authInfos authorization bytes + */ + public TephraZKClientService(String zkStr, int sessionTimeout, + Watcher connectionWatcher, Multimap<String, byte[]> authInfos) { + this.zkStr = zkStr; + this.sessionTimeout = sessionTimeout; + this.connectionWatchers = new CopyOnWriteArrayList<>(); + this.authInfos = copyAuthInfo(authInfos); + addConnectionWatcher(connectionWatcher); + + this.zooKeeper = new AtomicReference<>(); + this.stopTask = createStopTask(); + } + + @Override + public Long getSessionId() { + ZooKeeper zk = zooKeeper.get(); + return zk == null ? null : zk.getSessionId(); + } + + @Override + public String getConnectString() { + return zkStr; + } + + @Override + public Cancellable addConnectionWatcher(final Watcher watcher) { + if (watcher == null) { + return new Cancellable() { + @Override + public void cancel() { + // No-op + } + }; + } + + // Invocation of connection watchers are already done inside the event thread, + // hence no need to wrap the watcher again. + connectionWatchers.add(watcher); + return new Cancellable() { + @Override + public void cancel() { + connectionWatchers.remove(watcher); + } + }; + } + + @Override + public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) { + return create(path, data, createMode, true); + } + + @Override + public OperationFuture<String> create(String path, @Nullable byte[] data, + CreateMode createMode, boolean createParent) { + return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override + public OperationFuture<String> create(String path, @Nullable byte[] data, + CreateMode createMode, Iterable<ACL> acl) { + return create(path, data, createMode, true, acl); + } + + @Override + public OperationFuture<Stat> exists(String path) { + return exists(path, null); + } + + @Override + public OperationFuture<NodeChildren> getChildren(String path) { + return getChildren(path, null); + } + + @Override + public OperationFuture<NodeData> getData(String path) { + return getData(path, null); + } + + @Override + public OperationFuture<Stat> setData(String path, byte[] data) { + return setData(path, data, -1); + } + + @Override + public OperationFuture<String> delete(String path) { + return delete(path, -1); + } + + @Override + public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) { + return setACL(path, acl, -1); + } + + @Override + public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, + boolean createParent, Iterable<ACL> acl) { + return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false); + } + + private OperationFuture<String> doCreate(final String path, + @Nullable final byte[] data, + final CreateMode createMode, + final boolean createParent, + final List<ACL> acl, + final boolean ignoreNodeExists) { + final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture); + if (!createParent) { + return createFuture; + } + + // If create parent is request, return a different future + final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor); + // Watch for changes in the original future + Futures.addCallback(createFuture, new FutureCallback<String>() { + @Override + public void onSuccess(String path) { + // Propagate if creation was successful + result.set(path); + } + + @Override + public void onFailure(Throwable t) { + // See if the failure can be handled + if (updateFailureResult(t, result, path, ignoreNodeExists)) { + return; + } + // Create the parent node + String parentPath = getParent(path); + if (parentPath.isEmpty()) { + result.setException(t); + return; + } + // Watch for parent creation complete. Parent is created with the unsafe ACL. + Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT, + true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() { + @Override + public void onSuccess(String parentPath) { + // Create the requested path again + Futures.addCallback( + doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() { + @Override + public void onSuccess(String pathResult) { + result.set(pathResult); + } + + @Override + public void onFailure(Throwable t) { + // handle the failure + updateFailureResult(t, result, path, ignoreNodeExists); + } + }); + } + + @Override + public void onFailure(Throwable t) { + result.setException(t); + } + }); + } + + /** + * Updates the result future based on the given {@link Throwable}. + * @param t Cause of the failure + * @param result Future to be updated + * @param path Request path for the operation + * @return {@code true} if it is a failure, {@code false} otherwise. + */ + private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result, + String path, boolean ignoreNodeExists) { + // Propagate if there is error + if (!(t instanceof KeeperException)) { + result.setException(t); + return true; + } + KeeperException.Code code = ((KeeperException) t).code(); + // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation). + if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) { + // The requested path could be used because it only applies to non-sequential node + result.set(path); + return false; + } + if (code != KeeperException.Code.NONODE) { + result.setException(t); + return true; + } + return false; + } + + /** + * Gets the parent of the given path. + * @param path Path for computing its parent + * @return Parent of the given path, or empty string if the given path is the root path already. + */ + private String getParent(String path) { + String parentPath = path.substring(0, path.lastIndexOf('/')); + return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath; + } + }); + + return result; + } + + @Override + public OperationFuture<Stat> exists(String path, Watcher watcher) { + SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result); + return result; + } + + @Override + public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) { + SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result); + return result; + } + + @Override + public OperationFuture<NodeData> getData(String path, Watcher watcher) { + SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result); + + return result; + } + + @Override + public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) { + SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor); + getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result); + return result; + } + + @Override + public OperationFuture<String> delete(String deletePath, int version) { + SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor); + getZooKeeper().delete(deletePath, version, Callbacks.VOID, result); + return result; + } + + @Override + public OperationFuture<ACLData> getACL(String path) { + SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result); + return result; + } + + @Override + public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) { + SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor); + getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result); + return result; + } + + @Override + public Supplier<ZooKeeper> getZooKeeperSupplier() { + return new Supplier<ZooKeeper>() { + @Override + public ZooKeeper get() { + return getZooKeeper(); + } + }; + } + + @Override + protected void doStart() { + // A single thread executor for all events + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + Threads.createDaemonThreadFactory("zk-client-EventThread")); + // Just discard the execution if the executor is closed + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + eventExecutor = executor; + + try { + zooKeeper.set(createZooKeeper()); + } catch (IOException e) { + notifyFailed(e); + } + } + + @Override + protected void doStop() { + // Submit a task to the executor to make sure all pending events in the executor are fired before + // transiting this Service into STOPPED state + eventExecutor.submit(stopTask); + eventExecutor.shutdown(); + } + + /** + * @return Current {@link ZooKeeper} client. + */ + private ZooKeeper getZooKeeper() { + ZooKeeper zk = zooKeeper.get(); + Preconditions.checkArgument(zk != null, "Not connected to zooKeeper."); + return zk; + } + + /** + * Wraps the given watcher to be called from the event executor. + * @param watcher Watcher to be wrapped + * @return The wrapped Watcher + */ + private Watcher wrapWatcher(final Watcher watcher) { + if (watcher == null) { + return null; + } + return new Watcher() { + @Override + public void process(final WatchedEvent event) { + if (eventExecutor.isShutdown()) { + LOG.debug("Already shutdown. Discarding event: {}", event); + return; + } + eventExecutor.execute(new Runnable() { + @Override + public void run() { + try { + watcher.process(event); + } catch (Throwable t) { + LOG.error("Watcher throws exception.", t); + } + } + }); + } + }; + } + + /** + * Creates a deep copy of the given authInfos multimap. + */ + private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) { + Multimap<String, byte[]> result = ArrayListMultimap.create(); + + for (Map.Entry<String, byte[]> entry : authInfos.entries()) { + byte[] info = entry.getValue(); + result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length)); + } + + return result; + } + + @Override + public void process(WatchedEvent event) { + State state = state(); + if (state == State.TERMINATED || state == State.FAILED) { + return; + } + + try { + if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) { + LOG.debug("Connected to ZooKeeper: {}", zkStr); + notifyStarted(); + return; + } + if (event.getState() == Event.KeeperState.Expired) { + LOG.info("ZooKeeper session expired: {}", zkStr); + + // When connection expired, simply reconnect again + if (state != State.RUNNING) { + return; + } + eventExecutor.submit(new Runnable() { + @Override + public void run() { + // Only reconnect if the current state is running + if (state() != State.RUNNING) { + return; + } + try { + LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr); + closeZooKeeper(zooKeeper.getAndSet(createZooKeeper())); + } catch (IOException e) { + notifyFailed(e); + } + } + }); + } + } finally { + if (event.getType() == Event.EventType.None) { + for (Watcher connectionWatcher : connectionWatchers) { + connectionWatcher.process(event); + } + } + } + } + + /** + * Creates a new ZooKeeper connection. + */ + private ZooKeeper createZooKeeper() throws IOException { + ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this)); + for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) { + zk.addAuthInfo(authInfo.getKey(), authInfo.getValue()); + } + return zk; + } + + /** + * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException, + * it will get logged. + */ + private void closeZooKeeper(@Nullable ZooKeeper zk) { + try { + if (zk != null) { + zk.close(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted when closing ZooKeeper", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Creates a {@link Runnable} task that will get executed in the event executor for transiting this + * Service into STOPPED state. + */ + private Runnable createStopTask() { + return new Runnable() { + @Override + public void run() { + try { + // Close the ZK connection in this task will make sure if there is ZK connection created + // after doStop() was called but before this task has been executed is also closed. + // It is possible to happen when the following sequence happens: + // + // 1. session expired, hence the expired event is triggered + // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client + // 3. Service.stop() gets called, Service.state() changed to STOPPING + // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one + closeZooKeeper(zooKeeper.getAndSet(null)); + notifyStopped(); + } catch (Exception e) { + notifyFailed(e); + } + } + }; + } + + /** + * Collection of generic callbacks that simply reflect results into OperationFuture. + */ + private static final class Callbacks { + static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, String name) { + SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set((name == null || name.isEmpty()) ? path : name); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, Stat stat) { + SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set(stat); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + /** + * A stat callback that treats NONODE as success. + */ + static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, Stat stat) { + SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) { + result.set(stat); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set(new BasicNodeChildren(children, stat)); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set(new BasicNodeData(data, stat)); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx) { + SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set(result.getRequestPath()); + return; + } + // Otherwise, it is an error + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + + static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() { + @Override + @SuppressWarnings("unchecked") + public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { + SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx; + KeeperException.Code code = KeeperException.Code.get(rc); + if (code == KeeperException.Code.OK) { + result.set(new BasicACLData(acl, stat)); + return; + } + result.setException(KeeperException.create(code, result.getRequestPath())); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/thrift/transaction.thrift ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift index 39e6cec..0e3d712 100644 --- a/tephra-core/src/main/thrift/transaction.thrift +++ b/tephra-core/src/main/thrift/transaction.thrift @@ -16,7 +16,7 @@ # limitations under the License. # -namespace java co.cask.tephra.distributed.thrift +namespace java org.apache.tephra.distributed.thrift enum TTransactionType { SHORT = 1, http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java deleted file mode 100644 index cf01d25..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.distributed.TransactionService; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Scopes; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.internal.zookeeper.InMemoryZKServer; -import org.apache.twill.zookeeper.ZKClientService; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ThriftTransactionSystemTest extends TransactionSystemTest { - private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); - - private static InMemoryZKServer zkServer; - private static ZKClientService zkClientService; - private static TransactionService txService; - private static TransactionStateStorage storage; - private static TransactionSystemClient txClient; - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @BeforeClass - public static void start() throws Exception { - zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); - - Configuration conf = new Configuration(); - conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); - conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - Modules.override(new TransactionModules().getDistributedModules()) - .with(new AbstractModule() { - @Override - protected void configure() { - bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); - } - }), - new TransactionClientModule() - ); - - zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); - - // start a tx server - txService = injector.getInstance(TransactionService.class); - storage = injector.getInstance(TransactionStateStorage.class); - txClient = injector.getInstance(TransactionSystemClient.class); - try { - LOG.info("Starting transaction service"); - txService.startAndWait(); - } catch (Exception e) { - LOG.error("Failed to start service: ", e); - } - } - - @Before - public void reset() throws Exception { - getClient().resetState(); - } - - @AfterClass - public static void stop() throws Exception { - txService.stopAndWait(); - storage.stopAndWait(); - zkClientService.stopAndWait(); - zkServer.stopAndWait(); - } - - @Override - protected TransactionSystemClient getClient() throws Exception { - return txClient; - } - - @Override - protected TransactionStateStorage getStateStorage() throws Exception { - return storage; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java deleted file mode 100644 index de02f80..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.distributed.TransactionService; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Scopes; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.internal.zookeeper.InMemoryZKServer; -import org.apache.twill.zookeeper.ZKClientService; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.concurrent.TimeUnit; - -public class TransactionAdminTest { - private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class); - - private static Configuration conf; - private static InMemoryZKServer zkServer; - private static ZKClientService zkClientService; - private static TransactionService txService; - private static TransactionSystemClient txClient; - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @BeforeClass - public static void start() throws Exception { - zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); - - conf = new Configuration(); - conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); - conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - Modules.override(new TransactionModules().getDistributedModules()) - .with(new AbstractModule() { - @Override - protected void configure() { - bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); - } - }), - new TransactionClientModule() - ); - - zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); - - // start a tx server - txService = injector.getInstance(TransactionService.class); - txClient = injector.getInstance(TransactionSystemClient.class); - try { - LOG.info("Starting transaction service"); - txService.startAndWait(); - } catch (Exception e) { - LOG.error("Failed to start service: ", e); - } - } - - @Before - public void reset() throws Exception { - txClient.resetState(); - } - - @AfterClass - public static void stop() throws Exception { - txService.stopAndWait(); - zkClientService.stopAndWait(); - zkServer.stopAndWait(); - } - - @Test - public void testPrintUsage() throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err)); - int status = txAdmin.doMain(new String[0], conf); - Assert.assertEquals(1, status); - //noinspection ConstantConditions - Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:")); - Assert.assertEquals(0, out.toByteArray().length); - } - - @Test - public void testTruncateInvalidTx() throws Exception { - Transaction tx1 = txClient.startLong(); - Transaction tx2 = txClient.startShort(); - txClient.invalidate(tx1.getTransactionId()); - txClient.invalidate(tx2.getTransactionId()); - Assert.assertEquals(2, txClient.getInvalidSize()); - - TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err)); - int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf); - Assert.assertEquals(0, status); - Assert.assertEquals(1, txClient.getInvalidSize()); - } - - @Test - public void testTruncateInvalidTxBefore() throws Exception { - Transaction tx1 = txClient.startLong(); - TimeUnit.MILLISECONDS.sleep(1); - long beforeTx2 = System.currentTimeMillis(); - Transaction tx2 = txClient.startLong(); - - // Try before invalidation - Assert.assertEquals(0, txClient.getInvalidSize()); - TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err)); - int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf); - // Assert command failed due to in-progress transactions - Assert.assertEquals(1, status); - // Assert no change to invalid size - Assert.assertEquals(0, txClient.getInvalidSize()); - - txClient.invalidate(tx1.getTransactionId()); - txClient.invalidate(tx2.getTransactionId()); - Assert.assertEquals(2, txClient.getInvalidSize()); - - status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf); - Assert.assertEquals(0, status); - Assert.assertEquals(1, txClient.getInvalidSize()); - } - - @Test - public void testGetInvalidTxSize() throws Exception { - Transaction tx1 = txClient.startShort(); - txClient.startLong(); - txClient.invalidate(tx1.getTransactionId()); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err)); - int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf); - Assert.assertEquals(0, status); - //noinspection ConstantConditions - Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n")); - } -}
