Repository: incubator-tephra Updated Branches: refs/heads/feature/backward-compat bc86dae1f -> afe31fb60 (forced update)
Copying old classes to provide backward compatibility with TransactionEdit Create a v3 TxLogReader and deprecate v2 which contains logs written with old cask classes Fix BalanceBooks example run command Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/afe31fb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/afe31fb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/afe31fb6 Branch: refs/heads/feature/backward-compat Commit: afe31fb60116f6260e94ba64b3e619ea6c96b084 Parents: ab52491 Author: Gokul Gunasekaran <[email protected]> Authored: Thu May 12 10:57:04 2016 -0700 Committer: Gokul Gunasekaran <[email protected]> Committed: Thu May 12 13:44:12 2016 -0700 ---------------------------------------------------------------------- .../co/cask/tephra/persist/TransactionEdit.java | 362 +++++++++++++++++++ .../tephra/persist/TransactionEditCodecs.java | 313 ++++++++++++++++ .../java/org/apache/tephra/TxConstants.java | 2 +- .../tephra/persist/AbstractTransactionLog.java | 40 ++ .../HDFSTransactionLogReaderSupplier.java | 3 + .../persist/HDFSTransactionLogReaderV1.java | 5 +- .../persist/HDFSTransactionLogReaderV2.java | 8 +- .../persist/HDFSTransactionLogReaderV3.java | 114 ++++++ .../apache/tephra/persist/TransactionEdit.java | 11 + .../tephra/persist/HDFSTransactionLogTest.java | 127 ++++++- .../apache/tephra/util/TransactionEditUtil.java | 34 +- .../apache/tephra/examples/BalanceBooks.java | 2 +- 12 files changed, 983 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java new file mode 100644 index 0000000..c76061f --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package co.cask.tephra.persist; + +import com.google.common.base.Objects; +import com.google.common.collect.Sets; +import org.apache.hadoop.io.Writable; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.persist.TransactionLog; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; + +/** + * Represents a transaction state change in the {@link TransactionLog}. + */ +public class TransactionEdit implements Writable { + + /** + * The possible state changes for a transaction. + */ + public enum State { + INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT + } + + private long writePointer; + + /** + * stores the value of visibility upper bound + * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()}) + * for edit of {@link State#INPROGRESS} only + */ + private long visibilityUpperBound; + private long commitPointer; + private long expirationDate; + private State state; + private Set<ChangeId> changes; + /** Whether or not the COMMITTED change should be fully committed. */ + private boolean canCommit; + private TransactionType type; + private Set<Long> truncateInvalidTx; + private long truncateInvalidTxTime; + private long parentWritePointer; + private long[] checkpointPointers; + + // for Writable + public TransactionEdit() { + this.changes = Sets.newHashSet(); + this.truncateInvalidTx = Sets.newHashSet(); + } + + // package private for testing + TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate, + Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type, + Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer, + long[] checkpointPointers) { + this.writePointer = writePointer; + this.visibilityUpperBound = visibilityUpperBound; + this.state = state; + this.expirationDate = expirationDate; + this.changes = changes != null ? changes : Collections.<ChangeId>emptySet(); + this.commitPointer = commitPointer; + this.canCommit = canCommit; + this.type = type; + this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet(); + this.truncateInvalidTxTime = truncateInvalidTxTime; + this.parentWritePointer = parentWritePointer; + this.checkpointPointers = checkpointPointers; + } + + /** + * Returns the transaction write pointer assigned for the state change. + */ + public long getWritePointer() { + return writePointer; + } + + void setWritePointer(long writePointer) { + this.writePointer = writePointer; + } + + public long getVisibilityUpperBound() { + return visibilityUpperBound; + } + + void setVisibilityUpperBound(long visibilityUpperBound) { + this.visibilityUpperBound = visibilityUpperBound; + } + + /** + * Returns the type of state change represented. + */ + public State getState() { + return state; + } + + void setState(State state) { + this.state = state; + } + + /** + * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only + * be populated for changes of type {@link State#INPROGRESS}. + */ + public long getExpiration() { + return expirationDate; + } + + void setExpiration(long expirationDate) { + this.expirationDate = expirationDate; + } + + /** + * @return the set of changed row keys associated with the state change. This is only populated for edits + * of type {@link State#COMMITTING} or {@link State#COMMITTED}. + */ + public Set<ChangeId> getChanges() { + return changes; + } + + void setChanges(Set<ChangeId> changes) { + this.changes = changes; + } + + /** + * Returns the write pointer used to commit the row key change set. This is only populated for edits of type + * {@link State#COMMITTED}. + */ + public long getCommitPointer() { + return commitPointer; + } + + void setCommitPointer(long commitPointer) { + this.commitPointer = commitPointer; + } + + /** + * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits + * of type {@link State#COMMITTED}. + */ + public boolean getCanCommit() { + return canCommit; + } + + void setCanCommit(boolean canCommit) { + this.canCommit = canCommit; + } + + /** + * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or + * {@link State#ABORTED}. + */ + public TransactionType getType() { + return type; + } + + void setType(TransactionType type) { + this.type = type; + } + + /** + * Returns the transaction ids to be removed from invalid transaction list. This is only populated for + * edits of type {@link State#TRUNCATE_INVALID_TX} + */ + public Set<Long> getTruncateInvalidTx() { + return truncateInvalidTx; + } + + void setTruncateInvalidTx(Set<Long> truncateInvalidTx) { + this.truncateInvalidTx = truncateInvalidTx; + } + + /** + * Returns the time until which the invalid transactions need to be truncated from invalid transaction list. + * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX} + */ + public long getTruncateInvalidTxTime() { + return truncateInvalidTxTime; + } + + void setTruncateInvalidTxTime(long truncateInvalidTxTime) { + this.truncateInvalidTxTime = truncateInvalidTxTime; + } + + /** + * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type + * {@link State#CHECKPOINT} + */ + public long getParentWritePointer() { + return parentWritePointer; + } + + void setParentWritePointer(long parentWritePointer) { + this.parentWritePointer = parentWritePointer; + } + + /** + * Returns the checkpoint write pointers for the edit. This is only populated for edits of type + * {@link State#ABORTED}. + */ + public long[] getCheckpointPointers() { + return checkpointPointers; + } + + void setCheckpointPointers(long[] checkpointPointers) { + this.checkpointPointers = checkpointPointers; + } + + /** + * Creates a new instance in the {@link State#INPROGRESS} state. + */ + public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound, + long expirationDate, TransactionType type) { + return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS, + expirationDate, null, 0L, false, type, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#COMMITTING} state. + */ + public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) { + return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#COMMITTED} state. + */ + public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer, + boolean canCommit) { + return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null, + null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#ABORTED} state. + */ + public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) { + return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L, + checkpointPointers); + } + + /** + * Creates a new instance in the {@link State#INVALID} state. + */ + public static TransactionEdit createInvalid(long writePointer) { + return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#MOVE_WATERMARK} state. + */ + public static TransactionEdit createMoveWatermark(long writePointer) { + return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. + */ + public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) { + return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx, + 0L, 0L, null); + } + + /** + * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state. + */ + public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) { + return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null, + truncateInvalidTxTime, 0L, null); + } + + /** + * Creates a new instance in the {@link State#CHECKPOINT} state. + */ + public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) { + return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L, + parentWritePointer, null); + } + + @Override + public void write(DataOutput out) throws IOException { + TransactionEditCodecs.encode(this, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + TransactionEditCodecs.decode(this, in); + } + + @Override + public final boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TransactionEdit)) { + return false; + } + + TransactionEdit that = (TransactionEdit) o; + + return Objects.equal(this.writePointer, that.writePointer) && + Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) && + Objects.equal(this.commitPointer, that.commitPointer) && + Objects.equal(this.expirationDate, that.expirationDate) && + Objects.equal(this.state, that.state) && + Objects.equal(this.changes, that.changes) && + Objects.equal(this.canCommit, that.canCommit) && + Objects.equal(this.type, that.type) && + Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) && + Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) && + Objects.equal(this.parentWritePointer, that.parentWritePointer) && + Arrays.equals(this.checkpointPointers, that.checkpointPointers); + } + + @Override + public final int hashCode() { + return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes, + canCommit, type, parentWritePointer, checkpointPointers); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("writePointer", writePointer) + .add("visibilityUpperBound", visibilityUpperBound) + .add("commitPointer", commitPointer) + .add("expiration", expirationDate) + .add("state", state) + .add("changesSize", changes != null ? changes.size() : 0) + .add("canCommit", canCommit) + .add("type", type) + .add("truncateInvalidTx", truncateInvalidTx) + .add("truncateInvalidTxTime", truncateInvalidTxTime) + .add("parentWritePointer", parentWritePointer) + .add("checkpointPointers", checkpointPointers) + .toString(); + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java new file mode 100644 index 0000000..bf4d544 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package co.cask.tephra.persist; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionType; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility + * with older versions of the serialized data. + */ +public class TransactionEditCodecs { + + private static final TransactionEditCodec[] ALL_CODECS = { + new TransactionEditCodecV1(), + new TransactionEditCodecV2(), + new TransactionEditCodecV3(), + new TransactionEditCodecV4() + }; + + private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>(); + static { + for (TransactionEditCodec codec : ALL_CODECS) { + CODECS.put(codec.getVersion(), codec); + } + } + + /** + * Deserializes the encoded data from the given input stream, setting the values as fields + * on the given {@code TransactionEdit} instances. This method expects first value in the + * {code DataInput} to be a byte representing the codec version used to serialize the instance. + * + * @param dest the transaction edit to populate with the deserialized values + * @param in the input stream containing the encoded data + * @throws IOException if an error occurs while deserializing from the input stream + */ + public static void decode(TransactionEdit dest, DataInput in) throws IOException { + byte version = in.readByte(); + TransactionEditCodec codec = CODECS.get(version); + if (codec == null) { + throw new IOException("TransactionEdit was serialized with an unknown codec version " + version + + ". Was it written with a newer version of Tephra?"); + } + codec.decode(dest, in); + } + + /** + * Serializes the given {@code TransactionEdit} instance with the latest available codec. + * This will first write out the version of the codec used to serialize the instance so that + * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}. + * + * @param src the transaction edit to serialize + * @param out the output stream to contain the data + * @throws IOException if an error occurs while serializing to the output stream + */ + public static void encode(TransactionEdit src, DataOutput out) throws IOException { + TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey()); + out.writeByte(latestCodec.getVersion()); + latestCodec.encode(src, out); + } + + /** + * Encodes the given transaction edit using a specific codec. Note that this is only exposed + * for use by tests. + */ + @VisibleForTesting + static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException { + out.writeByte(codec.getVersion()); + codec.encode(src, out); + } + + /** + * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from + * binary representations. + */ + interface TransactionEditCodec { + /** + * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit} + * instance. + * + * @param dest the instance on which to set all the deserialized values + * @param in the input stream containing the serialized data + * @throws IOException if an error occurs while deserializing the data + */ + void decode(TransactionEdit dest, DataInput in) throws IOException; + + /** + * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data + * output stream. + * + * @param src the instance to serialize to the stream + * @param out the output stream to contain the data + * @throws IOException if an error occurs while serializing the data + */ + void encode(TransactionEdit src, DataOutput out) throws IOException; + + /** + * Returns the version number for this codec. Each codec should use a unique version number, with the newest + * codec having the lowest number. + */ + byte getVersion(); + } + + + // package-private for unit-test access + static class TransactionEditCodecV1 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + dest.setWritePointer(in.readLong()); + int stateIdx = in.readInt(); + try { + dest.setState(TransactionEdit.State.values()[stateIdx]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("State enum ordinal value is out of range: " + stateIdx); + } + dest.setExpiration(in.readLong()); + dest.setCommitPointer(in.readLong()); + dest.setCanCommit(in.readBoolean()); + int changeSize = in.readInt(); + Set<ChangeId> changes = Sets.newHashSet(); + for (int i = 0; i < changeSize; i++) { + int currentLength = in.readInt(); + byte[] currentBytes = new byte[currentLength]; + in.readFully(currentBytes); + changes.add(new ChangeId(currentBytes)); + } + dest.setChanges(changes); + // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until + // this tx is finished, but correctness will be preserved. + dest.setVisibilityUpperBound(0); + } + + /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for + * unit-tests only */ + @Override + @Deprecated + public void encode(TransactionEdit src, DataOutput out) throws IOException { + out.writeLong(src.getWritePointer()); + // use ordinal for predictable size, though this does not support evolution + out.writeInt(src.getState().ordinal()); + out.writeLong(src.getExpiration()); + out.writeLong(src.getCommitPointer()); + out.writeBoolean(src.getCanCommit()); + Set<ChangeId> changes = src.getChanges(); + if (changes == null) { + out.writeInt(0); + } else { + out.writeInt(changes.size()); + for (ChangeId c : changes) { + byte[] cKey = c.getKey(); + out.writeInt(cKey.length); + out.write(cKey); + } + } + // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2 + // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, + // it was added in V3 + } + + @Override + public byte getVersion() { + return -1; + } + } + + // package-private for unit-test access + static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + dest.setVisibilityUpperBound(in.readLong()); + } + + /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for + * unit-tests only */ + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + out.writeLong(src.getVisibilityUpperBound()); + // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2, + // it was added in V3 + } + + @Override + public byte getVersion() { + return -2; + } + } + + // TODO: refactor to avoid duplicate code among different version of codecs + // package-private for unit-test access + static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + int typeIdx = in.readInt(); + // null transaction type is represented as -1 + if (typeIdx < 0) { + dest.setType(null); + } else { + try { + dest.setType(TransactionType.values()[typeIdx]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Type enum ordinal value is out of range: " + typeIdx); + } + } + + int truncateInvalidTxSize = in.readInt(); + Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx()); + for (int i = 0; i < truncateInvalidTxSize; i++) { + truncateInvalidTx.add(in.readLong()); + } + dest.setTruncateInvalidTx(truncateInvalidTx); + dest.setTruncateInvalidTxTime(in.readLong()); + } + + private <T> Set<T> emptySet(Set<T> set) { + if (set == null) { + return Sets.newHashSet(); + } + set.clear(); + return set; + } + + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + // null transaction type is represented as -1 + if (src.getType() == null) { + out.writeInt(-1); + } else { + out.writeInt(src.getType().ordinal()); + } + + Set<Long> truncateInvalidTx = src.getTruncateInvalidTx(); + if (truncateInvalidTx == null) { + out.writeInt(0); + } else { + out.writeInt(truncateInvalidTx.size()); + for (long id : truncateInvalidTx) { + out.writeLong(id); + } + } + out.writeLong(src.getTruncateInvalidTxTime()); + } + + @Override + public byte getVersion() { + return -3; + } + } + + static class TransactionEditCodecV4 extends TransactionEditCodecV3 { + @Override + public void decode(TransactionEdit dest, DataInput in) throws IOException { + super.decode(dest, in); + dest.setParentWritePointer(in.readLong()); + int checkpointPointersLen = in.readInt(); + if (checkpointPointersLen >= 0) { + long[] checkpointPointers = new long[checkpointPointersLen]; + for (int i = 0; i < checkpointPointersLen; i++) { + checkpointPointers[i] = in.readLong(); + } + dest.setCheckpointPointers(checkpointPointers); + } + } + + @Override + public void encode(TransactionEdit src, DataOutput out) throws IOException { + super.encode(src, out); + out.writeLong(src.getParentWritePointer()); + long[] checkpointPointers = src.getCheckpointPointers(); + if (checkpointPointers == null) { + out.writeInt(-1); + } else { + out.writeInt(checkpointPointers.length); + for (int i = 0; i < checkpointPointers.length; i++) { + out.writeLong(checkpointPointers[i]); + } + } + } + + @Override + public byte getVersion() { + return -4; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 61ee3cc..1c4fafc 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -345,7 +345,7 @@ public class TxConstants { */ public static final String NUM_ENTRIES_APPENDED = "count"; public static final String VERSION_KEY = "version"; - public static final byte CURRENT_VERSION = 2; + public static final byte CURRENT_VERSION = 3; } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java index b1e0978..93aeef7 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java @@ -18,6 +18,7 @@ package org.apache.tephra.persist; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -248,4 +249,43 @@ public abstract class AbstractTransactionLog implements TransactionLog { this.edit.readFields(in); } } + + // package private for testing + @VisibleForTesting + static class CaskEntry implements Writable { + private LongWritable key; + private co.cask.tephra.persist.TransactionEdit edit; + + + // for Writable + public CaskEntry() { + this.key = new LongWritable(); + this.edit = new co.cask.tephra.persist.TransactionEdit(); + } + + public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) { + this.key = key; + this.edit = edit; + } + + public LongWritable getKey() { + return this.key; + } + + public co.cask.tephra.persist.TransactionEdit getEdit() { + return this.edit; + } + + @Override + public void write(DataOutput out) throws IOException { + this.key.write(out); + this.edit.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.key.readFields(in); + this.edit.readFields(in); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java index a517903..1bddc31 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java @@ -45,6 +45,9 @@ public class HDFSTransactionLogReaderSupplier implements Supplier<TransactionLog } switch (version) { + case 3: + logReader = new HDFSTransactionLogReaderV3(reader); + return logReader; case 2: logReader = new HDFSTransactionLogReaderV2(reader); return logReader; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java index faefaec..38b74d8 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java @@ -53,8 +53,9 @@ public class HDFSTransactionLogReaderV1 implements TransactionLogReader { } try { - boolean successful = reader.next(key, reuse); - return successful ? reuse : null; + co.cask.tephra.persist.TransactionEdit oldTxEdit = new co.cask.tephra.persist.TransactionEdit(); + boolean successful = reader.next(key, oldTxEdit); + return successful ? TransactionEdit.convertCaskTxEdit(oldTxEdit) : null; } catch (EOFException e) { LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e); return null; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java index ce50da8..e371b98 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java @@ -38,7 +38,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader { private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class); private final SequenceFile.Reader reader; - private final Queue<TransactionEdit> transactionEdits; + private final Queue<co.cask.tephra.persist.TransactionEdit> transactionEdits; private final CommitMarkerCodec commitMarkerCodec; private final LongWritable key; @@ -76,12 +76,12 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader { } if (!transactionEdits.isEmpty()) { - return transactionEdits.remove(); + return TransactionEdit.convertCaskTxEdit(transactionEdits.remove()); } // Fetch the 'marker' and read 'marker' number of edits populateTransactionEdits(); - return transactionEdits.poll(); + return TransactionEdit.convertCaskTxEdit(transactionEdits.poll()); } private void populateTransactionEdits() throws IOException { @@ -96,7 +96,7 @@ public class HDFSTransactionLogReaderV2 implements TransactionLogReader { } for (int i = 0; i < numEntries; i++) { - TransactionEdit edit = new TransactionEdit(); + co.cask.tephra.persist.TransactionEdit edit = new co.cask.tephra.persist.TransactionEdit(); try { if (reader.next(key, edit)) { transactionEdits.add(edit); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java new file mode 100644 index 0000000..3670e3f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * {@link TransactionLogReader} that can read v3 version of Transaction logs. The logs are expected to + * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker), + * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of + * {@link TransactionEdit}s are discarded. + */ +public class HDFSTransactionLogReaderV3 implements TransactionLogReader { + private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class); + + private final SequenceFile.Reader reader; + private final Queue<TransactionEdit> transactionEdits; + private final CommitMarkerCodec commitMarkerCodec; + private final LongWritable key; + + private boolean closed; + + public HDFSTransactionLogReaderV3(SequenceFile.Reader reader) { + this.reader = reader; + this.transactionEdits = new ArrayDeque<>(); + this.key = new LongWritable(); + this.commitMarkerCodec = new CommitMarkerCodec(); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + try { + commitMarkerCodec.close(); + } finally { + reader.close(); + closed = true; + } + } + + @Override + public TransactionEdit next() throws IOException { + return next(null); + } + + @Override + public TransactionEdit next(TransactionEdit reuse) throws IOException { + if (closed) { + return null; + } + + if (!transactionEdits.isEmpty()) { + return transactionEdits.remove(); + } + + // Fetch the 'marker' and read 'marker' number of edits + populateTransactionEdits(); + return transactionEdits.poll(); + } + + private void populateTransactionEdits() throws IOException { + // read the marker to determine numEntries to read. + int numEntries = 0; + try { + // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely + // ignore this + numEntries = commitMarkerCodec.readMarker(reader); + } catch (EOFException e) { + LOG.warn("Reached EOF in log while trying to read commit marker", e); + } + + for (int i = 0; i < numEntries; i++) { + TransactionEdit edit = new TransactionEdit(); + try { + if (reader.next(key, edit)) { + transactionEdits.add(edit); + } else { + throw new EOFException("Attempt to read TransactionEdit failed."); + } + } catch (EOFException e) { + // we have reached EOF before reading back numEntries, we clear the partial list and return. + LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker", + numEntries, transactionEdits.size(), e); + transactionEdits.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java index 1d07e72..3555a84 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java @@ -298,6 +298,17 @@ public class TransactionEdit implements Writable { parentWritePointer, null); } + public static TransactionEdit convertCaskTxEdit(co.cask.tephra.persist.TransactionEdit txEdit) { + if (txEdit == null) { + return null; + } + return new TransactionEdit(txEdit.getWritePointer(), txEdit.getVisibilityUpperBound(), + TransactionEdit.State.valueOf(txEdit.getState().toString()), txEdit.getExpiration(), + txEdit.getChanges(), txEdit.getCommitPointer(), txEdit.getCanCommit(), txEdit.getType(), + txEdit.getTruncateInvalidTx(), txEdit.getTruncateInvalidTxTime(), + txEdit.getParentWritePointer(), txEdit.getCheckpointPointers()); + } + @Override public void write(DataOutput out) throws IOException { TransactionEditCodecs.encode(this, out); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java index 7b9f06b..7a34e55 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java @@ -86,16 +86,26 @@ public class HDFSTransactionLogTest { } private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs, - long timeInMillis, boolean withMarker) throws IOException { + long timeInMillis, byte versionNumber) throws IOException { String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis); SequenceFile.Metadata metadata = new SequenceFile.Metadata(); - if (withMarker) { + if (versionNumber > 1) { metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), - new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION))); + new Text(Byte.toString(versionNumber))); + } + + switch (versionNumber) { + case 1: + case 2: + return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class, + co.cask.tephra.persist.TransactionEdit.class, + SequenceFile.CompressionType.NONE, null, null, metadata); + default: + return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class, + TransactionEdit.class, SequenceFile.CompressionType.NONE, + null, null, metadata); } - return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class, - TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata); } private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception { @@ -103,33 +113,101 @@ public class HDFSTransactionLogTest { CommitMarkerCodec.writeMarker(writer, size); } - private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete) + private void testCaskTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete) throws Exception { - List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount); + List<co.cask.tephra.persist.TransactionEdit> edits = TransactionEditUtil.createRandomCaskEdits(totalCount); long timestamp = System.currentTimeMillis(); Configuration configuration = getConfiguration(); FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration); - SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker); + SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber); AtomicLong logSequence = new AtomicLong(); HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp); - AbstractTransactionLog.Entry entry; + AbstractTransactionLog.CaskEntry entry; for (int i = 0; i < totalCount - batchSize; i += batchSize) { - if (withMarker) { + if (versionNumber > 1) { writeNumWrites(writer, batchSize); } for (int j = 0; j < batchSize; j++) { - entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j)); + entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(j)); writer.append(entry.getKey(), entry.getEdit()); } writer.syncFs(); } - if (withMarker) { + if (versionNumber > 1) { writeNumWrites(writer, batchSize); } for (int i = totalCount - batchSize; i < totalCount - 1; i++) { + entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(i)); + writer.append(entry.getKey(), entry.getEdit()); + } + + entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), + edits.get(totalCount - 1)); + if (isComplete) { + writer.append(entry.getKey(), entry.getEdit()); + } else { + byte[] bytes = Longs.toByteArray(entry.getKey().get()); + writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() { + @Override + public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { + byte[] test = new byte[]{0x2}; + outStream.write(test, 0, 1); + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { + // no-op + } + + @Override + public int getSize() { + // mimic size longer than the actual byte array size written, so we would reach EOF + return 12; + } + }); + } + writer.syncFs(); + Closeables.closeQuietly(writer); + + // now let's try to read this log + TransactionLogReader reader = transactionLog.getReader(); + int syncedEdits = 0; + while (reader.next() != null) { + // testing reading the transaction edits + syncedEdits++; + } + if (isComplete) { + Assert.assertEquals(totalCount, syncedEdits); + } else { + Assert.assertEquals(totalCount - batchSize, syncedEdits); + } + } + + private void testTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete) + throws Exception { + List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount); + long timestamp = System.currentTimeMillis(); + Configuration configuration = getConfiguration(); + FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration); + SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber); + AtomicLong logSequence = new AtomicLong(); + HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp); + AbstractTransactionLog.Entry entry; + + for (int i = 0; i < totalCount - batchSize; i += batchSize) { + writeNumWrites(writer, batchSize); + for (int j = 0; j < batchSize; j++) { + entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j)); + writer.append(entry.getKey(), entry.getEdit()); + } + writer.syncFs(); + } + + writeNumWrites(writer, batchSize); + for (int i = totalCount - batchSize; i < totalCount - 1; i++) { entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i)); writer.append(entry.getKey(), entry.getEdit()); } @@ -177,22 +255,33 @@ public class HDFSTransactionLogTest { } @Test - public void testTransactionLogNewVersion() throws Exception { + public void testTransactionLogVersion3() throws Exception { + // in-complete sync + testTransactionLogSync(1000, 1, (byte) 3, false); + testTransactionLogSync(2000, 5, (byte) 3, false); + + // complete sync + testTransactionLogSync(1000, 1, (byte) 3, true); + testTransactionLogSync(2000, 5, (byte) 3, true); + } + + @Test + public void testTransactionLogVersion2() throws Exception { // in-complete sync - testTransactionLogSync(1000, 1, true, false); - testTransactionLogSync(2000, 5, true, false); + testCaskTransactionLogSync(1000, 1, (byte) 2, false); + testCaskTransactionLogSync(2000, 5, (byte) 2, false); // complete sync - testTransactionLogSync(1000, 1, true, true); - testTransactionLogSync(2000, 5, true, true); + testCaskTransactionLogSync(1000, 1, (byte) 2, true); + testCaskTransactionLogSync(2000, 5, (byte) 2, true); } @Test public void testTransactionLogOldVersion() throws Exception { // in-complete sync - testTransactionLogSync(1000, 1, false, false); + testCaskTransactionLogSync(1000, 1, (byte) 1, false); // complete sync - testTransactionLogSync(2000, 5, false, true); + testCaskTransactionLogSync(2000, 5, (byte) 1, true); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java index 854ccdd..c6e897a 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java @@ -24,6 +24,7 @@ import org.apache.tephra.ChangeId; import org.apache.tephra.TransactionType; import org.apache.tephra.persist.TransactionEdit; +import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; @@ -34,6 +35,15 @@ import java.util.Set; public final class TransactionEditUtil { private static Random random = new Random(); + public static List<TransactionEdit> createRandomEdits(int numEntries) { + List<co.cask.tephra.persist.TransactionEdit> caskTxEdits = createRandomCaskEdits(numEntries); + List<TransactionEdit> txEdits = new ArrayList<>(); + for (co.cask.tephra.persist.TransactionEdit caskTxEdit : caskTxEdits) { + txEdits.add(TransactionEdit.convertCaskTxEdit(caskTxEdit)); + } + return txEdits; + } + /** * Generates a number of semi-random {@link TransactionEdit} instances. * These are just randomly selected from the possible states, so would not necessarily reflect a real-world @@ -42,32 +52,34 @@ public final class TransactionEditUtil { * @param numEntries how many entries to generate in the returned list. * @return a list of randomly generated transaction log edits. */ - public static List<TransactionEdit> createRandomEdits(int numEntries) { - List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries); + public static List<co.cask.tephra.persist.TransactionEdit> createRandomCaskEdits(int numEntries) { + List<co.cask.tephra.persist.TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries); for (int i = 0; i < numEntries; i++) { - TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)]; + co.cask.tephra.persist.TransactionEdit.State nextType = + co.cask.tephra.persist.TransactionEdit.State.values()[random.nextInt(6)]; long writePointer = Math.abs(random.nextLong()); switch (nextType) { case INPROGRESS: edits.add( - TransactionEdit.createStarted(writePointer, writePointer - 1, - System.currentTimeMillis() + 300000L, TransactionType.SHORT)); + co.cask.tephra.persist.TransactionEdit.createStarted(writePointer, writePointer - 1, + System.currentTimeMillis() + 300000L, + TransactionType.SHORT)); break; case COMMITTING: - edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10))); + edits.add(co.cask.tephra.persist.TransactionEdit.createCommitting(writePointer, generateChangeSet(10))); break; case COMMITTED: - edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1, - random.nextBoolean())); + edits.add(co.cask.tephra.persist.TransactionEdit.createCommitted(writePointer, generateChangeSet(10), + writePointer + 1, random.nextBoolean())); break; case INVALID: - edits.add(TransactionEdit.createInvalid(writePointer)); + edits.add(co.cask.tephra.persist.TransactionEdit.createInvalid(writePointer)); break; case ABORTED: - edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null)); + edits.add(co.cask.tephra.persist.TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null)); break; case MOVE_WATERMARK: - edits.add(TransactionEdit.createMoveWatermark(writePointer)); + edits.add(co.cask.tephra.persist.TransactionEdit.createMoveWatermark(writePointer)); break; } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/afe31fb6/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java index 17c1005..e191f5c 100644 --- a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java +++ b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java @@ -68,7 +68,7 @@ import java.util.Random; * <p> * You can run the BalanceBooks application with the following command: * <pre> - * ./bin/tephra run BalanceBooks [num clients] [num iterations] + * ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations] * </pre> * where <code>[num clients]</code> is the number of concurrent client threads to use, and * <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.
