Repository: phoenix Updated Branches: refs/heads/master 1c2b9b0e7 -> 9478d1f2b
PHOENIX-2477 ClassCastException in IndexedWALEditCodec after HBASE-14501 (possible dataloss) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9478d1f2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9478d1f2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9478d1f2 Branch: refs/heads/master Commit: 9478d1f2b4faef9f698c0c7ffa6654f67b8253e5 Parents: 1c2b9b0 Author: Enis Soztutar <[email protected]> Authored: Thu Dec 3 19:21:46 2015 -0800 Committer: Enis Soztutar <[email protected]> Committed: Thu Dec 3 19:21:46 2015 -0800 ---------------------------------------------------------------------- .../regionserver/wal/IndexedWALEditCodec.java | 56 ++++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9478d1f2/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java index c29f77d..2534b34 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -80,9 +82,35 @@ public class IndexedWALEditCodec extends WALCellCodec { } /** + * Returns a DataInput given an InputStream + */ + private static DataInput getDataInput(InputStream is) { + return is instanceof DataInput + ? (DataInput) is + : new DataInputStream(is); + } + + /** + * Returns a DataOutput given an OutputStream + */ + private static DataOutput getDataOutput(OutputStream os) { + return os instanceof DataOutput + ? (DataOutput) os + : new DataOutputStream(os); + } + + private static abstract class PhoenixBaseDecoder extends BaseDecoder { + protected DataInput dataInput; + public PhoenixBaseDecoder(InputStream in) { + super(in); + dataInput = getDataInput(this.in); + } + } + + /** * Custom Decoder that can handle a stream of regular and indexed {@link KeyValue}s. */ - public class IndexKeyValueDecoder extends BaseDecoder { + public static class IndexKeyValueDecoder extends PhoenixBaseDecoder { /** * Create a Decoder on the given input stream with the given Decoder to parse @@ -95,11 +123,11 @@ public class IndexedWALEditCodec extends WALCellCodec { @Override protected KeyValue parseCell() throws IOException{ - return KeyValueCodec.readKeyValue((DataInput) this.in); + return KeyValueCodec.readKeyValue(this.dataInput); } } - public class CompressedIndexKeyValueDecoder extends BaseDecoder { + public static class CompressedIndexKeyValueDecoder extends PhoenixBaseDecoder { private Decoder decoder; @@ -133,7 +161,15 @@ public class IndexedWALEditCodec extends WALCellCodec { } // its an indexedKeyValue, so parse it out specially - return KeyValueCodec.readKeyValue((DataInput) this.in); + return KeyValueCodec.readKeyValue(this.dataInput); + } + } + + private static abstract class PhoenixBaseEncoder extends BaseEncoder { + protected DataOutput dataOutput; + public PhoenixBaseEncoder(OutputStream out) { + super(out); + dataOutput = getDataOutput(this.out); } } @@ -141,7 +177,7 @@ public class IndexedWALEditCodec extends WALCellCodec { * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support * compression. */ - private static class IndexKeyValueEncoder extends BaseEncoder { + private static class IndexKeyValueEncoder extends PhoenixBaseEncoder { public IndexKeyValueEncoder(OutputStream os) { super(os); } @@ -157,7 +193,7 @@ public class IndexedWALEditCodec extends WALCellCodec { checkFlushed(); // use the standard encoding mechanism - KeyValueCodec.write((DataOutput) this.out, KeyValueUtil.ensureKeyValue(cell)); + KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell)); } } @@ -166,7 +202,7 @@ public class IndexedWALEditCodec extends WALCellCodec { * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed * and uncompressed WALs that contain index entries. */ - private static class CompressedIndexKeyValueEncoder extends BaseEncoder { + private static class CompressedIndexKeyValueEncoder extends PhoenixBaseEncoder { private Encoder compressedKvEncoder; public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) { @@ -184,20 +220,20 @@ public class IndexedWALEditCodec extends WALCellCodec { public void write(Cell cell) throws IOException { //make sure we are open checkFlushed(); - + //write the special marker so we can figure out which kind of kv is it int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER; if (cell instanceof IndexedKeyValue) { marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER; } out.write(marker); - + //then serialize based on the marker if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) { this.compressedKvEncoder.write(cell); } else{ - KeyValueCodec.write((DataOutput) out, KeyValueUtil.ensureKeyValue(cell)); + KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell)); } } }
