Repository: incubator-tephra
Updated Branches:
  refs/heads/feature/backward-compat [created] c6252e607


Copying old classes to provide backward compatibility with TransactionEdit


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/c6252e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/c6252e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/c6252e60

Branch: refs/heads/feature/backward-compat
Commit: c6252e60761ffb2e3070afb49a57286a0a1a1ee3
Parents: ab52491
Author: Gokul Gunasekaran <[email protected]>
Authored: Thu May 12 10:57:04 2016 -0700
Committer: Gokul Gunasekaran <[email protected]>
Committed: Thu May 12 10:57:04 2016 -0700

----------------------------------------------------------------------
 .../co/cask/tephra/persist/TransactionEdit.java | 362 +++++++++++++++++++
 .../tephra/persist/TransactionEditCodecs.java   | 313 ++++++++++++++++
 2 files changed, 675 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/c6252e60/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java 
b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
new file mode 100644
index 0000000..c76061f
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionLog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a transaction state change in the {@link TransactionLog}.
+ */
+public class TransactionEdit implements Writable {
+
+  /**
+   * The possible state changes for a transaction.
+   */
+  public enum State {
+    INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, 
TRUNCATE_INVALID_TX, CHECKPOINT
+  }
+
+  private long writePointer;
+
+  /**
+   * stores the value of visibility upper bound
+   * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
+   * for edit of {@link State#INPROGRESS} only
+   */
+  private long visibilityUpperBound;
+  private long commitPointer;
+  private long expirationDate;
+  private State state;
+  private Set<ChangeId> changes;
+  /** Whether or not the COMMITTED change should be fully committed. */
+  private boolean canCommit;
+  private TransactionType type;
+  private Set<Long> truncateInvalidTx;
+  private long truncateInvalidTxTime;
+  private long parentWritePointer;
+  private long[] checkpointPointers;
+
+  // for Writable
+  public TransactionEdit() {
+    this.changes = Sets.newHashSet();
+    this.truncateInvalidTx = Sets.newHashSet();
+  }
+
+  // package private for testing
+  TransactionEdit(long writePointer, long visibilityUpperBound, State state, 
long expirationDate,
+                  Set<ChangeId> changes, long commitPointer, boolean 
canCommit, TransactionType type,
+                  Set<Long> truncateInvalidTx, long truncateInvalidTxTime, 
long parentWritePointer,
+                  long[] checkpointPointers) {
+    this.writePointer = writePointer;
+    this.visibilityUpperBound = visibilityUpperBound;
+    this.state = state;
+    this.expirationDate = expirationDate;
+    this.changes = changes != null ? changes : 
Collections.<ChangeId>emptySet();
+    this.commitPointer = commitPointer;
+    this.canCommit = canCommit;
+    this.type = type;
+    this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : 
Collections.<Long>emptySet();
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+    this.parentWritePointer = parentWritePointer;
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Returns the transaction write pointer assigned for the state change.
+   */
+  public long getWritePointer() {
+    return writePointer;
+  }
+
+  void setWritePointer(long writePointer) {
+    this.writePointer = writePointer;
+  }
+
+  public long getVisibilityUpperBound() {
+    return visibilityUpperBound;
+  }
+
+  void setVisibilityUpperBound(long visibilityUpperBound) {
+    this.visibilityUpperBound = visibilityUpperBound;
+  }
+
+  /**
+   * Returns the type of state change represented.
+   */
+  public State getState() {
+    return state;
+  }
+
+  void setState(State state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns any expiration timestamp (in milliseconds) associated with the 
state change.  This should only
+   * be populated for changes of type {@link State#INPROGRESS}.
+   */
+  public long getExpiration() {
+    return expirationDate;
+  }
+
+  void setExpiration(long expirationDate) {
+    this.expirationDate = expirationDate;
+  }
+
+  /**
+   * @return the set of changed row keys associated with the state change.  
This is only populated for edits
+   * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
+   */
+  public Set<ChangeId> getChanges() {
+    return changes;
+  }
+
+  void setChanges(Set<ChangeId> changes) {
+    this.changes = changes;
+  }
+
+  /**
+   * Returns the write pointer used to commit the row key change set.  This is 
only populated for edits of type
+   * {@link State#COMMITTED}.
+   */
+  public long getCommitPointer() {
+    return commitPointer;
+  }
+
+  void setCommitPointer(long commitPointer) {
+    this.commitPointer = commitPointer;
+  }
+
+  /**
+   * Returns whether or not the transaction should be moved to the committed 
set.  This is only populated for edits
+   * of type {@link State#COMMITTED}.
+   */
+  public boolean getCanCommit() {
+    return canCommit;
+  }
+
+  void setCanCommit(boolean canCommit) {
+    this.canCommit = canCommit;
+  }
+
+  /**
+   * Returns the transaction type. This is only populated for edits of type 
{@link State#INPROGRESS} or
+   * {@link State#ABORTED}.
+   */
+  public TransactionType getType() {
+    return type;
+  }
+
+  void setType(TransactionType type) {
+    this.type = type;
+  }
+
+  /**
+   * Returns the transaction ids to be removed from invalid transaction list. 
This is only populated for
+   * edits of type {@link State#TRUNCATE_INVALID_TX}
+   */
+  public Set<Long> getTruncateInvalidTx() {
+    return truncateInvalidTx;
+  }
+
+  void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+    this.truncateInvalidTx = truncateInvalidTx;
+  }
+
+  /**
+   * Returns the time until which the invalid transactions need to be 
truncated from invalid transaction list.
+   * This is only populated for  edits of type {@link 
State#TRUNCATE_INVALID_TX}
+   */
+  public long getTruncateInvalidTxTime() {
+    return truncateInvalidTxTime;
+  }
+
+  void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
+    this.truncateInvalidTxTime = truncateInvalidTxTime;
+  }
+
+  /**
+   * Returns the parent write pointer for a checkpoint operation.  This is 
only populated for edits of type
+   * {@link State#CHECKPOINT}
+   */
+  public long getParentWritePointer() {
+    return parentWritePointer;
+  }
+
+  void setParentWritePointer(long parentWritePointer) {
+    this.parentWritePointer = parentWritePointer;
+  }
+
+  /**
+   * Returns the checkpoint write pointers for the edit.  This is only 
populated for edits of type
+   * {@link State#ABORTED}.
+   */
+  public long[] getCheckpointPointers() {
+    return checkpointPointers;
+  }
+
+  void setCheckpointPointers(long[] checkpointPointers) {
+    this.checkpointPointers = checkpointPointers;
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INPROGRESS} state.
+   */
+  public static TransactionEdit createStarted(long writePointer, long 
visibilityUpperBound,
+                                              long expirationDate, 
TransactionType type) {
+    return new TransactionEdit(writePointer, visibilityUpperBound, 
State.INPROGRESS,
+                               expirationDate, null, 0L, false, type, null, 
0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTING} state.
+   */
+  public static TransactionEdit createCommitting(long writePointer, 
Set<ChangeId> changes) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, 
changes, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#COMMITTED} state.
+   */
+  public static TransactionEdit createCommitted(long writePointer, 
Set<ChangeId> changes, long nextWritePointer,
+                                                boolean canCommit) {
+    return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, 
nextWritePointer, canCommit, null,
+                               null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#ABORTED} state.
+   */
+  public static TransactionEdit createAborted(long writePointer, 
TransactionType type, long[] checkpointPointers) {
+    return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, 
false, type, null, 0L, 0L,
+                               checkpointPointers);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#INVALID} state.
+   */
+  public static TransactionEdit createInvalid(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, 
false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
+   */
+  public static TransactionEdit createMoveWatermark(long writePointer) {
+    return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, 
null, 0L, false, null, null, 0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTx(Set<Long> 
truncateInvalidTx) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 
0L, false, null, truncateInvalidTx,
+                               0L, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+   */
+  public static TransactionEdit createTruncateInvalidTxBefore(long 
truncateInvalidTxTime) {
+    return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 
0L, false, null, null,
+                               truncateInvalidTxTime, 0L, null);
+  }
+
+  /**
+   * Creates a new instance in the {@link State#CHECKPOINT} state.
+   */
+  public static TransactionEdit createCheckpoint(long writePointer, long 
parentWritePointer) {
+    return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 
0L, false, null, null, 0L,
+                               parentWritePointer, null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TransactionEditCodecs.encode(this, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    TransactionEditCodecs.decode(this, in);
+  }
+
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TransactionEdit)) {
+      return false;
+    }
+
+    TransactionEdit that = (TransactionEdit) o;
+
+    return Objects.equal(this.writePointer, that.writePointer) &&
+      Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
+      Objects.equal(this.commitPointer, that.commitPointer) &&
+      Objects.equal(this.expirationDate, that.expirationDate) &&
+      Objects.equal(this.state, that.state) &&
+      Objects.equal(this.changes, that.changes) &&
+      Objects.equal(this.canCommit, that.canCommit) &&
+      Objects.equal(this.type, that.type) &&
+      Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
+      Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
+      Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
+      Arrays.equals(this.checkpointPointers, that.checkpointPointers);
+  }
+
+  @Override
+  public final int hashCode() {
+    return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, 
expirationDate, state, changes,
+                            canCommit, type, parentWritePointer, 
checkpointPointers);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("writePointer", writePointer)
+      .add("visibilityUpperBound", visibilityUpperBound)
+      .add("commitPointer", commitPointer)
+      .add("expiration", expirationDate)
+      .add("state", state)
+      .add("changesSize", changes != null ? changes.size() : 0)
+      .add("canCommit", canCommit)
+      .add("type", type)
+      .add("truncateInvalidTx", truncateInvalidTx)
+      .add("truncateInvalidTxTime", truncateInvalidTxTime)
+      .add("parentWritePointer", parentWritePointer)
+      .add("checkpointPointers", checkpointPointers)
+      .toString();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/c6252e60/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java 
b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
new file mode 100644
index 0000000..bf4d544
--- /dev/null
+++ 
b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Utilities to handle encoding and decoding of {@link TransactionEdit} 
entries, while maintaining compatibility
+ * with older versions of the serialized data.
+ */
+public class TransactionEditCodecs {
+
+  private static final TransactionEditCodec[] ALL_CODECS = {
+      new TransactionEditCodecV1(),
+      new TransactionEditCodecV2(),
+      new TransactionEditCodecV3(),
+      new TransactionEditCodecV4()
+  };
+
+  private static final SortedMap<Byte, TransactionEditCodec> CODECS = new 
TreeMap<>();
+  static {
+    for (TransactionEditCodec codec : ALL_CODECS) {
+      CODECS.put(codec.getVersion(), codec);
+    }
+  }
+
+  /**
+   * Deserializes the encoded data from the given input stream, setting the 
values as fields
+   * on the given {@code TransactionEdit} instances.  This method expects 
first value in the
+   * {code DataInput} to be a byte representing the codec version used to 
serialize the instance.
+   *
+   * @param dest the transaction edit to populate with the deserialized values
+   * @param in the input stream containing the encoded data
+   * @throws IOException if an error occurs while deserializing from the input 
stream
+   */
+  public static void decode(TransactionEdit dest, DataInput in) throws 
IOException {
+    byte version = in.readByte();
+    TransactionEditCodec codec = CODECS.get(version);
+    if (codec == null) {
+      throw new IOException("TransactionEdit was serialized with an unknown 
codec version " + version +
+          ". Was it written with a newer version of Tephra?");
+    }
+    codec.decode(dest, in);
+  }
+
+  /**
+   * Serializes the given {@code TransactionEdit} instance with the latest 
available codec.
+   * This will first write out the version of the codec used to serialize the 
instance so that
+   * the correct codec can be used when calling {@link 
#decode(TransactionEdit, DataInput)}.
+   *
+   * @param src the transaction edit to serialize
+   * @param out the output stream to contain the data
+   * @throws IOException if an error occurs while serializing to the output 
stream
+   */
+  public static void encode(TransactionEdit src, DataOutput out) throws 
IOException {
+    TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
+    out.writeByte(latestCodec.getVersion());
+    latestCodec.encode(src, out);
+  }
+
+  /**
+   * Encodes the given transaction edit using a specific codec.  Note that 
this is only exposed
+   * for use by tests.
+   */
+  @VisibleForTesting
+  static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec 
codec) throws IOException {
+    out.writeByte(codec.getVersion());
+    codec.encode(src, out);
+  }
+
+  /**
+   * Defines the interface used for encoding and decoding {@link 
TransactionEdit} instances to and from
+   * binary representations.
+   */
+  interface TransactionEditCodec {
+    /**
+     * Reads the encoded values from the data input stream and sets the fields 
in the given {@code TransactionEdit}
+     * instance.
+     *
+     * @param dest the instance on which to set all the deserialized values
+     * @param in the input stream containing the serialized data
+     * @throws IOException if an error occurs while deserializing the data
+     */
+    void decode(TransactionEdit dest, DataInput in) throws IOException;
+
+    /**
+     * Writes all the field values from the {@code TransactionEdit} instance 
in serialized form to the data
+     * output stream.
+     *
+     * @param src the instance to serialize to the stream
+     * @param out the output stream to contain the data
+     * @throws IOException if an error occurs while serializing the data
+     */
+    void encode(TransactionEdit src, DataOutput out) throws IOException;
+
+    /**
+     * Returns the version number for this codec.  Each codec should use a 
unique version number, with the newest
+     * codec having the lowest number.
+     */
+    byte getVersion();
+  }
+
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV1 implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      dest.setWritePointer(in.readLong());
+      int stateIdx = in.readInt();
+      try {
+        dest.setState(TransactionEdit.State.values()[stateIdx]);
+      } catch (ArrayIndexOutOfBoundsException e) {
+        throw new IOException("State enum ordinal value is out of range: " + 
stateIdx);
+      }
+      dest.setExpiration(in.readLong());
+      dest.setCommitPointer(in.readLong());
+      dest.setCanCommit(in.readBoolean());
+      int changeSize = in.readInt();
+      Set<ChangeId> changes = Sets.newHashSet();
+      for (int i = 0; i < changeSize; i++) {
+        int currentLength = in.readInt();
+        byte[] currentBytes = new byte[currentLength];
+        in.readFully(currentBytes);
+        changes.add(new ChangeId(currentBytes));
+      }
+      dest.setChanges(changes);
+      // 1st version did not store this info. It is safe to set 
firstInProgress to 0, it may decrease performance until
+      // this tx is finished, but correctness will be preserved.
+      dest.setVisibilityUpperBound(0);
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} 
instead, it is still here for
+     *  unit-tests only */
+    @Override
+    @Deprecated
+    public void encode(TransactionEdit src, DataOutput out) throws IOException 
{
+      out.writeLong(src.getWritePointer());
+      // use ordinal for predictable size, though this does not support 
evolution
+      out.writeInt(src.getState().ordinal());
+      out.writeLong(src.getExpiration());
+      out.writeLong(src.getCommitPointer());
+      out.writeBoolean(src.getCanCommit());
+      Set<ChangeId> changes = src.getChanges();
+      if (changes == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(changes.size());
+        for (ChangeId c : changes) {
+          byte[] cKey = c.getKey();
+          out.writeInt(cKey.length);
+          out.write(cKey);
+        }
+      }
+      // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
+      // we didn't have transaction type, truncateInvalidTx and 
truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -1;
+    }
+  }
+
+  // package-private for unit-test access
+  static class TransactionEditCodecV2 extends TransactionEditCodecV1 
implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setVisibilityUpperBound(in.readLong());
+    }
+
+    /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} 
instead, it is still here for
+     *  unit-tests only */
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException 
{
+      super.encode(src, out);
+      out.writeLong(src.getVisibilityUpperBound());
+      // NOTE: we didn't have transaction type, truncateInvalidTx and 
truncateInvalidTxTime in V1 and V2,
+      // it was added in V3
+    }
+
+    @Override
+    public byte getVersion() {
+      return -2;
+    }
+  }
+
+  // TODO: refactor to avoid duplicate code among different version of codecs
+  // package-private for unit-test access
+  static class TransactionEditCodecV3 extends TransactionEditCodecV2 
implements TransactionEditCodec {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      int typeIdx = in.readInt();
+      // null transaction type is represented as -1
+      if (typeIdx < 0) {
+        dest.setType(null);
+      } else {
+        try {
+          dest.setType(TransactionType.values()[typeIdx]);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          throw new IOException("Type enum ordinal value is out of range: " + 
typeIdx);
+        }
+      }
+
+      int truncateInvalidTxSize = in.readInt();
+      Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
+      for (int i = 0; i < truncateInvalidTxSize; i++) {
+        truncateInvalidTx.add(in.readLong());
+      }
+      dest.setTruncateInvalidTx(truncateInvalidTx);
+      dest.setTruncateInvalidTxTime(in.readLong());
+    }
+
+    private <T> Set<T> emptySet(Set<T> set) {
+      if (set == null) {
+        return Sets.newHashSet();
+      }
+      set.clear();
+      return set;
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException 
{
+      super.encode(src, out);
+      // null transaction type is represented as -1
+      if (src.getType() == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(src.getType().ordinal());
+      }
+
+      Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
+      if (truncateInvalidTx == null) {
+        out.writeInt(0);
+      } else {
+        out.writeInt(truncateInvalidTx.size());
+        for (long id : truncateInvalidTx) {
+          out.writeLong(id);
+        }
+      }
+      out.writeLong(src.getTruncateInvalidTxTime());
+    }
+
+    @Override
+    public byte getVersion() {
+      return -3;
+    }
+  }
+
+  static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
+    @Override
+    public void decode(TransactionEdit dest, DataInput in) throws IOException {
+      super.decode(dest, in);
+      dest.setParentWritePointer(in.readLong());
+      int checkpointPointersLen = in.readInt();
+      if (checkpointPointersLen >= 0) {
+        long[] checkpointPointers = new long[checkpointPointersLen];
+        for (int i = 0; i < checkpointPointersLen; i++) {
+          checkpointPointers[i] = in.readLong();
+        }
+        dest.setCheckpointPointers(checkpointPointers);
+      }
+    }
+
+    @Override
+    public void encode(TransactionEdit src, DataOutput out) throws IOException 
{
+      super.encode(src, out);
+      out.writeLong(src.getParentWritePointer());
+      long[] checkpointPointers = src.getCheckpointPointers();
+      if (checkpointPointers == null) {
+        out.writeInt(-1);
+      } else {
+        out.writeInt(checkpointPointers.length);
+        for (int i = 0; i < checkpointPointers.length; i++) {
+          out.writeLong(checkpointPointers[i]);
+        }
+      }
+    }
+
+    @Override
+    public byte getVersion() {
+      return -4;
+    }
+  }
+}

Reply via email to