HADOOP-13010. Refactor raw erasure coders. Contributed by Kai Zheng

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77202fa1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77202fa1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77202fa1

Branch: refs/heads/trunk
Commit: 77202fa1035a54496d11d07472fbc399148ff630
Parents: 4f513a4
Author: Kai Zheng <kai.zh...@intel.com>
Authored: Fri May 27 13:23:34 2016 +0800
Committer: Kai Zheng <kai.zh...@intel.com>
Committed: Fri May 27 13:23:34 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/io/erasurecode/CodecUtil.java | 113 +++-------
 .../io/erasurecode/ErasureCoderOptions.java     |  89 ++++++++
 .../erasurecode/coder/HHXORErasureDecoder.java  |  18 +-
 .../erasurecode/coder/HHXORErasureEncoder.java  |  15 +-
 .../io/erasurecode/coder/RSErasureDecoder.java  |   6 +-
 .../io/erasurecode/coder/RSErasureEncoder.java  |   6 +-
 .../io/erasurecode/coder/XORErasureDecoder.java |   6 +-
 .../io/erasurecode/coder/XORErasureEncoder.java |   6 +-
 .../rawcoder/AbstractRawErasureCoder.java       | 220 -------------------
 .../rawcoder/AbstractRawErasureDecoder.java     | 181 ---------------
 .../rawcoder/AbstractRawErasureEncoder.java     | 146 ------------
 .../rawcoder/ByteArrayDecodingState.java        | 111 ++++++++++
 .../rawcoder/ByteArrayEncodingState.java        |  81 +++++++
 .../rawcoder/ByteBufferDecodingState.java       | 134 +++++++++++
 .../rawcoder/ByteBufferEncodingState.java       |  98 +++++++++
 .../io/erasurecode/rawcoder/CoderOption.java    |  43 ----
 .../io/erasurecode/rawcoder/CoderUtil.java      | 199 +++++++++++++++++
 .../io/erasurecode/rawcoder/DecodingState.java  |  55 +++++
 .../erasurecode/rawcoder/DummyRawDecoder.java   |  16 +-
 .../erasurecode/rawcoder/DummyRawEncoder.java   |  15 +-
 .../rawcoder/DummyRawErasureCoderFactory.java   |  10 +-
 .../io/erasurecode/rawcoder/EncodingState.java  |  44 ++++
 .../io/erasurecode/rawcoder/RSRawDecoder.java   |  48 ++--
 .../rawcoder/RSRawDecoderLegacy.java            |  66 +++---
 .../io/erasurecode/rawcoder/RSRawEncoder.java   |  45 ++--
 .../rawcoder/RSRawEncoderLegacy.java            |  82 ++++---
 .../rawcoder/RSRawErasureCoderFactory.java      |   9 +-
 .../RSRawErasureCoderFactoryLegacy.java         |   9 +-
 .../erasurecode/rawcoder/RawErasureCoder.java   |  73 ------
 .../rawcoder/RawErasureCoderFactory.java        |  11 +-
 .../erasurecode/rawcoder/RawErasureDecoder.java | 137 +++++++++++-
 .../erasurecode/rawcoder/RawErasureEncoder.java | 135 +++++++++++-
 .../io/erasurecode/rawcoder/XORRawDecoder.java  |  51 +++--
 .../io/erasurecode/rawcoder/XORRawEncoder.java  |  57 ++---
 .../rawcoder/XORRawErasureCoderFactory.java     |   9 +-
 .../io/erasurecode/rawcoder/package-info.java   |  38 ++++
 .../io/erasurecode/rawcoder/util/CoderUtil.java |  83 -------
 .../erasurecode/rawcoder/util/GaloisField.java  |   4 +-
 .../erasurecode/TestCodecRawCoderMapping.java   |  29 ++-
 .../hadoop/io/erasurecode/TestCoderBase.java    |  14 +-
 .../erasurecode/rawcoder/TestDummyRawCoder.java |   2 +-
 .../erasurecode/rawcoder/TestRawCoderBase.java  |  50 +++--
 .../hadoop/hdfs/DFSStripedInputStream.java      |   7 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     |   7 +-
 .../erasurecode/StripedReconstructor.java       |   7 +-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |   8 +-
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |  23 +-
 47 files changed, 1496 insertions(+), 1120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
index fcce071..9cd9561 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
@@ -36,115 +35,61 @@ public final class CodecUtil {
 
   /**
    * Create RS raw encoder according to configuration.
-   * @param conf configuration possibly with some items to configure the coder
-   * @param numDataUnits number of data units in a coding group
-   * @param numParityUnits number of parity units in a coding group
+   * @param conf configuration
+   * @param coderOptions coder options that's used to create the coder
    * @param codec the codec to use. If null, will use the default codec
    * @return raw encoder
    */
-  public static RawErasureEncoder createRSRawEncoder(
-      Configuration conf, int numDataUnits, int numParityUnits, String codec) {
+  public static RawErasureEncoder createRawEncoder(
+      Configuration conf, String codec, ErasureCoderOptions coderOptions) {
     Preconditions.checkNotNull(conf);
-    if (codec == null) {
-      codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME;
-    }
-    RawErasureCoder rawCoder = createRawCoder(conf,
-        getFactNameFromCodec(conf, codec), true, numDataUnits, numParityUnits);
-    return (RawErasureEncoder) rawCoder;
-  }
+    Preconditions.checkNotNull(codec);
 
-  /**
-   * Create RS raw encoder using the default codec.
-   */
-  public static RawErasureEncoder createRSRawEncoder(
-      Configuration conf, int numDataUnits, int numParityUnits) {
-    return createRSRawEncoder(conf, numDataUnits, numParityUnits, null);
+    String rawCoderFactoryKey = getFactNameFromCodec(conf, codec);
+
+    RawErasureCoderFactory fact = createRawCoderFactory(conf,
+        rawCoderFactoryKey);
+
+    return fact.createEncoder(coderOptions);
   }
 
   /**
    * Create RS raw decoder according to configuration.
-   * @param conf configuration possibly with some items to configure the coder
-   * @param numDataUnits number of data units in a coding group
-   * @param numParityUnits number of parity units in a coding group
+   * @param conf configuration
+   * @param coderOptions coder options that's used to create the coder
    * @param codec the codec to use. If null, will use the default codec
    * @return raw decoder
    */
-  public static RawErasureDecoder createRSRawDecoder(
-      Configuration conf, int numDataUnits, int numParityUnits, String codec) {
+  public static RawErasureDecoder createRawDecoder(
+      Configuration conf, String codec, ErasureCoderOptions coderOptions) {
     Preconditions.checkNotNull(conf);
-    if (codec == null) {
-      codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME;
-    }
-    RawErasureCoder rawCoder = createRawCoder(conf,
-        getFactNameFromCodec(conf, codec), false, numDataUnits, 
numParityUnits);
-    return (RawErasureDecoder) rawCoder;
-  }
+    Preconditions.checkNotNull(codec);
 
-  /**
-   * Create RS raw decoder using the default codec.
-   */
-  public static RawErasureDecoder createRSRawDecoder(
-      Configuration conf, int numDataUnits, int numParityUnits) {
-    return createRSRawDecoder(conf, numDataUnits, numParityUnits, null);
-  }
+    String rawCoderFactoryKey = getFactNameFromCodec(conf, codec);
 
-  /**
-   * Create XOR raw encoder according to configuration.
-   * @param conf configuration possibly with some items to configure the coder
-   * @param numDataUnits number of data units in a coding group
-   * @param numParityUnits number of parity units in a coding group
-   * @return raw encoder
-   */
-  public static RawErasureEncoder createXORRawEncoder(
-      Configuration conf, int numDataUnits, int numParityUnits) {
-    Preconditions.checkNotNull(conf);
-    RawErasureCoder rawCoder = createRawCoder(conf,
-        getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME),
-        true, numDataUnits, numParityUnits);
-    return (RawErasureEncoder) rawCoder;
-  }
+    RawErasureCoderFactory fact = createRawCoderFactory(conf,
+        rawCoderFactoryKey);
 
-  /**
-   * Create XOR raw decoder according to configuration.
-   * @param conf configuration possibly with some items to configure the coder
-   * @param numDataUnits number of data units in a coding group
-   * @param numParityUnits number of parity units in a coding group
-   * @return raw decoder
-   */
-  public static RawErasureDecoder createXORRawDecoder(
-      Configuration conf, int numDataUnits, int numParityUnits) {
-    Preconditions.checkNotNull(conf);
-    RawErasureCoder rawCoder = createRawCoder(conf,
-        getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME),
-        false, numDataUnits, numParityUnits);
-    return (RawErasureDecoder) rawCoder;
+    return fact.createDecoder(coderOptions);
   }
 
-  /**
-   * Create raw coder using specified conf and raw coder factory key.
-   * @param conf configuration possibly with some items to configure the coder
-   * @param rawCoderFactory name of the raw coder factory
-   * @param isEncoder is encoder or not we're going to create
-   * @param numDataUnits number of data units in a coding group
-   * @param numParityUnits number of parity units in a coding group
-   * @return raw coder
-   */
-  public static RawErasureCoder createRawCoder(Configuration conf,
-      String rawCoderFactory, boolean isEncoder, int numDataUnits,
-                                               int numParityUnits) {
-
+  private static RawErasureCoderFactory createRawCoderFactory(
+      Configuration conf, String rawCoderFactoryKey) {
     RawErasureCoderFactory fact;
     try {
       Class<? extends RawErasureCoderFactory> factClass = conf.getClassByName(
-          rawCoderFactory).asSubclass(RawErasureCoderFactory.class);
+          rawCoderFactoryKey).asSubclass(RawErasureCoderFactory.class);
       fact = factClass.newInstance();
     } catch (ClassNotFoundException | InstantiationException |
         IllegalAccessException e) {
-      throw new RuntimeException("Failed to create raw coder", e);
+      throw new RuntimeException("Failed to create raw coder factory", e);
+    }
+
+    if (fact == null) {
+      throw new RuntimeException("Failed to create raw coder factory");
     }
 
-    return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
-            fact.createDecoder(numDataUnits, numParityUnits);
+    return fact;
   }
 
   private static String getFactNameFromCodec(Configuration conf, String codec) 
{

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java
new file mode 100644
index 0000000..106a36c
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Erasure coder configuration that maintains schema info and coder options.
+ */
+@InterfaceAudience.Private
+public final class ErasureCoderOptions {
+
+  private final int numDataUnits;
+  private final int numParityUnits;
+  private final int numAllUnits;
+  private final boolean allowChangeInputs;
+  private final boolean allowVerboseDump;
+
+  public ErasureCoderOptions(int numDataUnits, int numParityUnits) {
+    this(numDataUnits, numParityUnits, false, false);
+  }
+
+  public ErasureCoderOptions(int numDataUnits, int numParityUnits,
+                        boolean allowChangeInputs, boolean allowVerboseDump) {
+    this.numDataUnits = numDataUnits;
+    this.numParityUnits = numParityUnits;
+    this.numAllUnits = numDataUnits + numParityUnits;
+    this.allowChangeInputs = allowChangeInputs;
+    this.allowVerboseDump = allowVerboseDump;
+  }
+
+  /**
+   * The number of data input units for the coding. A unit can be a byte,
+   * chunk or buffer or even a block.
+   * @return count of data input units
+   */
+  public int getNumDataUnits() {
+    return numDataUnits;
+  }
+
+  /**
+   * The number of parity output units for the coding. A unit can be a byte,
+   * chunk, buffer or even a block.
+   * @return count of parity output units
+   */
+  public int getNumParityUnits() {
+    return numParityUnits;
+  }
+
+  /**
+   * The number of all the involved units in the coding.
+   * @return count of all the data units and parity units
+   */
+  public int getNumAllUnits() {
+    return numAllUnits;
+  }
+
+  /**
+   * Allow changing input buffer content (not positions). Maybe better
+   * performance if not allowed.
+   * @return true if allowing input content to be changed, false otherwise
+   */
+  public boolean allowChangeInputs() {
+    return allowChangeInputs;
+  }
+
+  /**
+   * Allow dump verbose debug info or not.
+   * @return true if verbose debug info is desired, false otherwise
+   */
+  public boolean allowVerboseDump() {
+    return allowVerboseDump;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java
index ac4df16..94487d8 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java
@@ -22,7 +22,10 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.*;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 
 /**
  * Hitchhiker is a new erasure coding algorithm developed as a research project
@@ -68,17 +71,20 @@ public class HHXORErasureDecoder extends 
AbstractErasureDecoder {
 
   private RawErasureDecoder checkCreateRSRawDecoder() {
     if (rsRawDecoder == null) {
-      rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
-              getNumDataUnits(), getNumParityUnits());
+      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
+          getNumDataUnits(), getNumParityUnits());
+      rsRawDecoder = CodecUtil.createRawDecoder(getConf(),
+              ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
     }
     return rsRawDecoder;
   }
 
   private RawErasureEncoder checkCreateXorRawEncoder() {
     if (xorRawEncoder == null) {
-      xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
-              getNumDataUnits(), getNumParityUnits());
-      xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
+      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
+          getNumDataUnits(), getNumParityUnits());
+      xorRawEncoder = CodecUtil.createRawEncoder(getConf(),
+          ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
     }
     return xorRawEncoder;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java
index a402469..219f25c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java
@@ -22,7 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 
 /**
@@ -64,17 +65,21 @@ public class HHXORErasureEncoder extends 
AbstractErasureEncoder {
 
   private RawErasureEncoder checkCreateRSRawEncoder() {
     if (rsRawEncoder == null) {
-      rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(),
+      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
           getNumDataUnits(), getNumParityUnits());
+      rsRawEncoder = CodecUtil.createRawEncoder(getConf(),
+          ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
     }
     return rsRawEncoder;
   }
 
   private RawErasureEncoder checkCreateXorRawEncoder() {
     if (xorRawEncoder == null) {
-      xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
-              getNumDataUnits(), getNumParityUnits());
-      xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
+      ErasureCoderOptions erasureCoderOptions = new ErasureCoderOptions(
+          getNumDataUnits(), getNumParityUnits());
+      xorRawEncoder = CodecUtil.createRawEncoder(getConf(),
+          ErasureCodeConstants.XOR_CODEC_NAME,
+          erasureCoderOptions);
     }
     return xorRawEncoder;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
index 47efd29..afaaf24 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
 /**
@@ -55,8 +57,10 @@ public class RSErasureDecoder extends AbstractErasureDecoder 
{
   private RawErasureDecoder checkCreateRSRawDecoder() {
     if (rsRawDecoder == null) {
       // TODO: we should create the raw coder according to codec.
-      rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
+      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
           getNumDataUnits(), getNumParityUnits());
+      rsRawDecoder = CodecUtil.createRawDecoder(getConf(),
+          ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
     }
     return rsRawDecoder;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
index 4806d9e..2139113 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 
 /**
@@ -55,8 +57,10 @@ public class RSErasureEncoder extends AbstractErasureEncoder 
{
   private RawErasureEncoder checkCreateRSRawEncoder() {
     if (rawEncoder == null) {
       // TODO: we should create the raw coder according to codec.
-      rawEncoder = CodecUtil.createRSRawEncoder(getConf(),
+      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
           getNumDataUnits(), getNumParityUnits());
+      rawEncoder = CodecUtil.createRawEncoder(getConf(),
+          ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions);
     }
     return rawEncoder;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
index a61bafd..47fb8da 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
 /**
@@ -43,8 +45,10 @@ public class XORErasureDecoder extends 
AbstractErasureDecoder {
   @Override
   protected ErasureCodingStep prepareDecodingStep(
       final ECBlockGroup blockGroup) {
-    RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(),
+    ErasureCoderOptions coderOptions = new ErasureCoderOptions(
         getNumDataUnits(), getNumParityUnits());
+    RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(getConf(),
+        ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
 
     ECBlock[] inputBlocks = getInputBlocks(blockGroup);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
index 3f22247..1735179 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECBlock;
 import org.apache.hadoop.io.erasurecode.ECBlockGroup;
 import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 
 /**
@@ -43,8 +45,10 @@ public class XORErasureEncoder extends 
AbstractErasureEncoder {
   @Override
   protected ErasureCodingStep prepareEncodingStep(
       final ECBlockGroup blockGroup) {
-    RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(),
+    ErasureCoderOptions coderOptions = new ErasureCoderOptions(
         getNumDataUnits(), getNumParityUnits());
+    RawErasureEncoder rawEncoder = CodecUtil.createRawEncoder(getConf(),
+        ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);
 
     ECBlock[] inputBlocks = getInputBlocks(blockGroup);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
deleted file mode 100644
index b195216..0000000
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.erasurecode.rawcoder;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configured;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A common class of basic facilities to be shared by encoder and decoder
- *
- * It implements the {@link RawErasureCoder} interface.
- */
-@InterfaceAudience.Private
-public abstract class AbstractRawErasureCoder
-    extends Configured implements RawErasureCoder {
-
-  private static byte[] emptyChunk = new byte[4096];
-  private final int numDataUnits;
-  private final int numParityUnits;
-  private final int numAllUnits;
-  private final Map<CoderOption, Object> coderOptions;
-
-  public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) {
-    this.numDataUnits = numDataUnits;
-    this.numParityUnits = numParityUnits;
-    this.numAllUnits = numDataUnits + numParityUnits;
-    this.coderOptions = new HashMap<>(3);
-
-    coderOptions.put(CoderOption.PREFER_DIRECT_BUFFER, preferDirectBuffer());
-    coderOptions.put(CoderOption.ALLOW_CHANGE_INPUTS, false);
-    coderOptions.put(CoderOption.ALLOW_VERBOSE_DUMP, false);
-  }
-
-  @Override
-  public Object getCoderOption(CoderOption option) {
-    if (option == null) {
-      throw new HadoopIllegalArgumentException("Invalid option");
-    }
-    return coderOptions.get(option);
-  }
-
-  @Override
-  public void setCoderOption(CoderOption option, Object value) {
-    if (option == null || value == null) {
-      throw new HadoopIllegalArgumentException(
-          "Invalid option or option value");
-    }
-    if (option.isReadOnly()) {
-      throw new HadoopIllegalArgumentException(
-          "The option is read-only: " + option.name());
-    }
-
-    coderOptions.put(option, value);
-  }
-
-  /**
-   * Make sure to return an empty chunk buffer for the desired length.
-   * @param leastLength
-   * @return empty chunk of zero bytes
-   */
-  protected static byte[] getEmptyChunk(int leastLength) {
-    if (emptyChunk.length >= leastLength) {
-      return emptyChunk; // In most time
-    }
-
-    synchronized (AbstractRawErasureCoder.class) {
-      emptyChunk = new byte[leastLength];
-    }
-
-    return emptyChunk;
-  }
-
-  @Override
-  public int getNumDataUnits() {
-    return numDataUnits;
-  }
-
-  @Override
-  public int getNumParityUnits() {
-    return numParityUnits;
-  }
-
-  protected int getNumAllUnits() {
-    return numAllUnits;
-  }
-
-  @Override
-  public void release() {
-    // Nothing to do by default
-  }
-
-  /**
-   * Tell if direct buffer is preferred or not. It's for callers to
-   * decide how to allocate coding chunk buffers, using DirectByteBuffer or
-   * bytes array. It will return false by default.
-   * @return true if native buffer is preferred for performance consideration,
-   * otherwise false.
-   */
-  protected boolean preferDirectBuffer() {
-    return false;
-  }
-
-  protected boolean isAllowingChangeInputs() {
-    Object value = getCoderOption(CoderOption.ALLOW_CHANGE_INPUTS);
-    if (value != null && value instanceof Boolean) {
-      return (boolean) value;
-    }
-    return false;
-  }
-
-  protected boolean isAllowingVerboseDump() {
-    Object value = getCoderOption(CoderOption.ALLOW_VERBOSE_DUMP);
-    if (value != null && value instanceof Boolean) {
-      return (boolean) value;
-    }
-    return false;
-  }
-
-  /**
-   * Ensure a buffer filled with ZERO bytes from current readable/writable
-   * position.
-   * @param buffer a buffer ready to read / write certain size bytes
-   * @return the buffer itself, with ZERO bytes written, the position and limit
-   *         are not changed after the call
-   */
-  protected ByteBuffer resetBuffer(ByteBuffer buffer, int len) {
-    int pos = buffer.position();
-    buffer.put(getEmptyChunk(len), 0, len);
-    buffer.position(pos);
-
-    return buffer;
-  }
-
-  /**
-   * Ensure the buffer (either input or output) ready to read or write with 
ZERO
-   * bytes fully in specified length of len.
-   * @param buffer bytes array buffer
-   * @return the buffer itself
-   */
-  protected byte[] resetBuffer(byte[] buffer, int offset, int len) {
-    byte[] empty = getEmptyChunk(len);
-    System.arraycopy(empty, 0, buffer, offset, len);
-
-    return buffer;
-  }
-
-  /**
-   * Check and ensure the buffers are of the length specified by dataLen, also
-   * ensure the buffers are direct buffers or not according to isDirectBuffer.
-   * @param buffers the buffers to check
-   * @param allowNull whether to allow any element to be null or not
-   * @param dataLen the length of data available in the buffer to ensure with
-   * @param isDirectBuffer is direct buffer or not to ensure with
-   * @param isOutputs is output buffer or not
-   */
-  protected void checkParameterBuffers(ByteBuffer[] buffers, boolean
-      allowNull, int dataLen, boolean isDirectBuffer, boolean isOutputs) {
-    for (ByteBuffer buffer : buffers) {
-      if (buffer == null && !allowNull) {
-        throw new HadoopIllegalArgumentException(
-            "Invalid buffer found, not allowing null");
-      } else if (buffer != null) {
-        if (buffer.remaining() != dataLen) {
-          throw new HadoopIllegalArgumentException(
-              "Invalid buffer, not of length " + dataLen);
-        }
-        if (buffer.isDirect() != isDirectBuffer) {
-          throw new HadoopIllegalArgumentException(
-              "Invalid buffer, isDirect should be " + isDirectBuffer);
-        }
-        if (isOutputs) {
-          resetBuffer(buffer, dataLen);
-        }
-      }
-    }
-  }
-
-  /**
-   * Check and ensure the buffers are of the length specified by dataLen. If is
-   * output buffers, ensure they will be ZEROed.
-   * @param buffers the buffers to check
-   * @param allowNull whether to allow any element to be null or not
-   * @param dataLen the length of data available in the buffer to ensure with
-   * @param isOutputs is output buffer or not
-   */
-  protected void checkParameterBuffers(byte[][] buffers, boolean allowNull,
-                                       int dataLen, boolean isOutputs) {
-    for (byte[] buffer : buffers) {
-      if (buffer == null && !allowNull) {
-        throw new HadoopIllegalArgumentException(
-            "Invalid buffer found, not allowing null");
-      } else if (buffer != null && buffer.length != dataLen) {
-        throw new HadoopIllegalArgumentException(
-            "Invalid buffer not of length " + dataLen);
-      } else if (isOutputs) {
-        resetBuffer(buffer, 0, dataLen);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
deleted file mode 100644
index cf2b738..0000000
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.erasurecode.rawcoder;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.erasurecode.ECChunk;
-import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil;
-
-import java.nio.ByteBuffer;
-
-/**
- * An abstract raw erasure decoder that's to be inherited by new decoders.
- *
- * It implements the {@link RawErasureDecoder} interface.
- */
-@InterfaceAudience.Private
-public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
-    implements RawErasureDecoder {
-
-  public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) {
-    super(numDataUnits, numParityUnits);
-  }
-
-  @Override
-  public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
-                     ByteBuffer[] outputs) {
-    checkParameters(inputs, erasedIndexes, outputs);
-
-    ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
-    boolean usingDirectBuffer = validInput.isDirect();
-    int dataLen = validInput.remaining();
-    if (dataLen == 0) {
-      return;
-    }
-    checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
-    checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
-
-    int[] inputPositions = new int[inputs.length];
-    for (int i = 0; i < inputPositions.length; i++) {
-      if (inputs[i] != null) {
-        inputPositions[i] = inputs[i].position();
-      }
-    }
-
-    if (usingDirectBuffer) {
-      doDecode(inputs, erasedIndexes, outputs);
-    } else {
-      int[] inputOffsets = new int[inputs.length];
-      int[] outputOffsets = new int[outputs.length];
-      byte[][] newInputs = new byte[inputs.length][];
-      byte[][] newOutputs = new byte[outputs.length][];
-
-      ByteBuffer buffer;
-      for (int i = 0; i < inputs.length; ++i) {
-        buffer = inputs[i];
-        if (buffer != null) {
-          inputOffsets[i] = buffer.arrayOffset() + buffer.position();
-          newInputs[i] = buffer.array();
-        }
-      }
-
-      for (int i = 0; i < outputs.length; ++i) {
-        buffer = outputs[i];
-        outputOffsets[i] = buffer.arrayOffset() + buffer.position();
-        newOutputs[i] = buffer.array();
-      }
-
-      doDecode(newInputs, inputOffsets, dataLen,
-          erasedIndexes, newOutputs, outputOffsets);
-    }
-
-    for (int i = 0; i < inputs.length; i++) {
-      if (inputs[i] != null) {
-        // dataLen bytes consumed
-        inputs[i].position(inputPositions[i] + dataLen);
-      }
-    }
-  }
-
-  /**
-   * Perform the real decoding using Direct ByteBuffer.
-   * @param inputs Direct ByteBuffers expected
-   * @param erasedIndexes indexes of erased units in the inputs array
-   * @param outputs Direct ByteBuffers expected
-   */
-  protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
-                                   ByteBuffer[] outputs);
-
-  @Override
-  public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
-    checkParameters(inputs, erasedIndexes, outputs);
-
-    byte[] validInput = CoderUtil.findFirstValidInput(inputs);
-    int dataLen = validInput.length;
-    if (dataLen == 0) {
-      return;
-    }
-    checkParameterBuffers(inputs, true, dataLen, false);
-    checkParameterBuffers(outputs, false, dataLen, true);
-
-    int[] inputOffsets = new int[inputs.length]; // ALL ZERO
-    int[] outputOffsets = new int[outputs.length]; // ALL ZERO
-
-    doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs,
-        outputOffsets);
-  }
-
-  /**
-   * Perform the real decoding using bytes array, supporting offsets and
-   * lengths.
-   * @param inputs the input byte arrays to read data from
-   * @param inputOffsets offsets for the input byte arrays to read data from
-   * @param dataLen how much data are to be read from
-   * @param erasedIndexes indexes of erased units in the inputs array
-   * @param outputs the output byte arrays to write resultant data into
-   * @param outputOffsets offsets from which to write resultant data into
-   */
-  protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
-                                   int dataLen, int[] erasedIndexes,
-                                   byte[][] outputs, int[] outputOffsets);
-
-  @Override
-  public void decode(ECChunk[] inputs, int[] erasedIndexes,
-                     ECChunk[] outputs) {
-    ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
-    ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
-    decode(newInputs, erasedIndexes, newOutputs);
-  }
-
-  /**
-   * Check and validate decoding parameters, throw exception accordingly. The
-   * checking assumes it's a MDS code. Other code  can override this.
-   * @param inputs input buffers to check
-   * @param erasedIndexes indexes of erased units in the inputs array
-   * @param outputs output buffers to check
-   */
-  protected <T> void checkParameters(T[] inputs, int[] erasedIndexes,
-                                 T[] outputs) {
-    if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
-      throw new IllegalArgumentException("Invalid inputs length");
-    }
-
-    if (erasedIndexes.length != outputs.length) {
-      throw new HadoopIllegalArgumentException(
-          "erasedIndexes and outputs mismatch in length");
-    }
-
-    if (erasedIndexes.length > getNumParityUnits()) {
-      throw new HadoopIllegalArgumentException(
-          "Too many erased, not recoverable");
-    }
-
-    int validInputs = 0;
-    for (T input : inputs) {
-      if (input != null) {
-        validInputs += 1;
-      }
-    }
-
-    if (validInputs < getNumDataUnits()) {
-      throw new HadoopIllegalArgumentException(
-          "No enough valid inputs are provided, not recoverable");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
deleted file mode 100644
index 49cc2c4..0000000
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.erasurecode.rawcoder;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.erasurecode.ECChunk;
-
-import java.nio.ByteBuffer;
-
-/**
- * An abstract raw erasure encoder that's to be inherited by new encoders.
- *
- * It implements the {@link RawErasureEncoder} interface.
- */
-@InterfaceAudience.Private
-public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
-    implements RawErasureEncoder {
-
-  public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) {
-    super(numDataUnits, numParityUnits);
-  }
-
-  @Override
-  public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
-    checkParameters(inputs, outputs);
-
-    boolean usingDirectBuffer = inputs[0].isDirect();
-    int dataLen = inputs[0].remaining();
-    if (dataLen == 0) {
-      return;
-    }
-    checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false);
-    checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
-
-    int[] inputPositions = new int[inputs.length];
-    for (int i = 0; i < inputPositions.length; i++) {
-      if (inputs[i] != null) {
-        inputPositions[i] = inputs[i].position();
-      }
-    }
-
-    if (usingDirectBuffer) {
-      doEncode(inputs, outputs);
-    } else {
-      int[] inputOffsets = new int[inputs.length];
-      int[] outputOffsets = new int[outputs.length];
-      byte[][] newInputs = new byte[inputs.length][];
-      byte[][] newOutputs = new byte[outputs.length][];
-
-      ByteBuffer buffer;
-      for (int i = 0; i < inputs.length; ++i) {
-        buffer = inputs[i];
-        inputOffsets[i] = buffer.arrayOffset() + buffer.position();
-        newInputs[i] = buffer.array();
-      }
-
-      for (int i = 0; i < outputs.length; ++i) {
-        buffer = outputs[i];
-        outputOffsets[i] = buffer.arrayOffset() + buffer.position();
-        newOutputs[i] = buffer.array();
-      }
-
-      doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
-    }
-
-    for (int i = 0; i < inputs.length; i++) {
-      if (inputs[i] != null) {
-        // dataLen bytes consumed
-        inputs[i].position(inputPositions[i] + dataLen);
-      }
-    }
-  }
-
-  /**
-   * Perform the real encoding work using direct ByteBuffer
-   * @param inputs Direct ByteBuffers expected
-   * @param outputs Direct ByteBuffers expected
-   */
-  protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs);
-
-  @Override
-  public void encode(byte[][] inputs, byte[][] outputs) {
-    checkParameters(inputs, outputs);
-    int dataLen = inputs[0].length;
-    if (dataLen == 0) {
-      return;
-    }
-    checkParameterBuffers(inputs, false, dataLen, false);
-    checkParameterBuffers(outputs, false, dataLen, true);
-
-    int[] inputOffsets = new int[inputs.length]; // ALL ZERO
-    int[] outputOffsets = new int[outputs.length]; // ALL ZERO
-
-    doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets);
-  }
-
-  /**
-   * Perform the real encoding work using bytes array, supporting offsets
-   * and lengths.
-   * @param inputs the input byte arrays to read data from
-   * @param inputOffsets offsets for the input byte arrays to read data from
-   * @param dataLen how much data are to be read from
-   * @param outputs the output byte arrays to write resultant data into
-   * @param outputOffsets offsets from which to write resultant data into
-   */
-  protected abstract void doEncode(byte[][] inputs, int[] inputOffsets,
-                                   int dataLen, byte[][] outputs,
-                                   int[] outputOffsets);
-
-  @Override
-  public void encode(ECChunk[] inputs, ECChunk[] outputs) {
-    ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
-    ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
-    encode(newInputs, newOutputs);
-  }
-
-  /**
-   * Check and validate decoding parameters, throw exception accordingly.
-   * @param inputs input buffers to check
-   * @param outputs output buffers to check
-   */
-  protected <T> void checkParameters(T[] inputs, T[] outputs) {
-    if (inputs.length != getNumDataUnits()) {
-      throw new HadoopIllegalArgumentException("Invalid inputs length");
-    }
-    if (outputs.length != getNumParityUnits()) {
-      throw new HadoopIllegalArgumentException("Invalid outputs length");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java
new file mode 100644
index 0000000..69c084d
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A utility class that maintains decoding state during a decode call using
+ * byte array inputs.
+ */
+@InterfaceAudience.Private
+class ByteArrayDecodingState extends DecodingState {
+  byte[][] inputs;
+  int[] inputOffsets;
+  int[] erasedIndexes;
+  byte[][] outputs;
+  int[] outputOffsets;
+
+  ByteArrayDecodingState(RawErasureDecoder decoder, byte[][] inputs,
+                         int[] erasedIndexes, byte[][] outputs) {
+    this.decoder = decoder;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.erasedIndexes = erasedIndexes;
+    byte[] validInput = CoderUtil.findFirstValidInput(inputs);
+    this.decodeLength = validInput.length;
+
+    checkParameters(inputs, erasedIndexes, outputs);
+    checkInputBuffers(inputs);
+    checkOutputBuffers(outputs);
+
+    this.inputOffsets = new int[inputs.length]; // ALL ZERO
+    this.outputOffsets = new int[outputs.length]; // ALL ZERO
+  }
+
+  ByteArrayDecodingState(RawErasureDecoder decoder,
+                         int decodeLength,
+                         int[] erasedIndexes,
+                         byte[][] inputs,
+                         int[] inputOffsets,
+                         byte[][] outputs,
+                         int[] outputOffsets) {
+    this.decoder = decoder;
+    this.decodeLength = decodeLength;
+    this.erasedIndexes = erasedIndexes;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.inputOffsets = inputOffsets;
+    this.outputOffsets = outputOffsets;
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length.
+   * @param buffers the buffers to check
+   */
+  void checkInputBuffers(byte[][] buffers) {
+    int validInputs = 0;
+
+    for (byte[] buffer : buffers) {
+      if (buffer == null) {
+        continue;
+      }
+
+      if (buffer.length != decodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, not of length " + decodeLength);
+      }
+
+      validInputs++;
+    }
+
+    if (validInputs < decoder.getNumDataUnits()) {
+      throw new HadoopIllegalArgumentException(
+          "No enough valid inputs are provided, not recoverable");
+    }
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length.
+   * @param buffers the buffers to check
+   */
+  void checkOutputBuffers(byte[][] buffers) {
+    for (byte[] buffer : buffers) {
+      if (buffer == null) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer found, not allowing null");
+      }
+
+      if (buffer.length != decodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer not of length " + decodeLength);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java
new file mode 100644
index 0000000..9d861d4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A utility class that maintains encoding state during an encode call using
+ * byte array inputs.
+ */
+@InterfaceAudience.Private
+class ByteArrayEncodingState extends EncodingState {
+  byte[][] inputs;
+  byte[][] outputs;
+  int[] inputOffsets;
+  int[] outputOffsets;
+
+  ByteArrayEncodingState(RawErasureEncoder encoder,
+                         byte[][] inputs, byte[][] outputs) {
+    this.encoder = encoder;
+    byte[] validInput = CoderUtil.findFirstValidInput(inputs);
+    this.encodeLength = validInput.length;
+    this.inputs = inputs;
+    this.outputs = outputs;
+
+    checkParameters(inputs, outputs);
+    checkBuffers(inputs);
+    checkBuffers(outputs);
+
+    this.inputOffsets = new int[inputs.length]; // ALL ZERO
+    this.outputOffsets = new int[outputs.length]; // ALL ZERO
+  }
+
+  ByteArrayEncodingState(RawErasureEncoder encoder,
+                         int encodeLength,
+                         byte[][] inputs,
+                         int[] inputOffsets,
+                         byte[][] outputs,
+                         int[] outputOffsets) {
+    this.encoder = encoder;
+    this.encodeLength = encodeLength;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.inputOffsets = inputOffsets;
+    this.outputOffsets = outputOffsets;
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length.
+   * @param buffers the buffers to check
+   */
+  void checkBuffers(byte[][] buffers) {
+    for (byte[] buffer : buffers) {
+      if (buffer == null) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer found, not allowing null");
+      }
+
+      if (buffer.length != encodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer not of length " + encodeLength);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java
new file mode 100644
index 0000000..5c5b0f6
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A utility class that maintains decoding state during a decode call using
+ * ByteBuffer inputs.
+ */
+@InterfaceAudience.Private
+class ByteBufferDecodingState extends DecodingState {
+  ByteBuffer[] inputs;
+  ByteBuffer[] outputs;
+  int[] erasedIndexes;
+  boolean usingDirectBuffer;
+
+  ByteBufferDecodingState(RawErasureDecoder decoder, ByteBuffer[] inputs,
+                          int[] erasedIndexes, ByteBuffer[] outputs) {
+    this.decoder = decoder;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.erasedIndexes = erasedIndexes;
+    ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
+    this.decodeLength = validInput.remaining();
+    this.usingDirectBuffer = validInput.isDirect();
+
+    checkParameters(inputs, erasedIndexes, outputs);
+    checkInputBuffers(inputs);
+    checkOutputBuffers(outputs);
+  }
+
+  /**
+   * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays.
+   */
+  ByteArrayDecodingState convertToByteArrayState() {
+    int[] inputOffsets = new int[inputs.length];
+    int[] outputOffsets = new int[outputs.length];
+    byte[][] newInputs = new byte[inputs.length][];
+    byte[][] newOutputs = new byte[outputs.length][];
+
+    ByteBuffer buffer;
+    for (int i = 0; i < inputs.length; ++i) {
+      buffer = inputs[i];
+      if (buffer != null) {
+        inputOffsets[i] = buffer.arrayOffset() + buffer.position();
+        newInputs[i] = buffer.array();
+      }
+    }
+
+    for (int i = 0; i < outputs.length; ++i) {
+      buffer = outputs[i];
+      outputOffsets[i] = buffer.arrayOffset() + buffer.position();
+      newOutputs[i] = buffer.array();
+    }
+
+    ByteArrayDecodingState baeState = new ByteArrayDecodingState(decoder,
+        decodeLength, erasedIndexes, newInputs,
+        inputOffsets, newOutputs, outputOffsets);
+    return baeState;
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length and type, direct
+   * buffers or not.
+   * @param buffers the buffers to check
+   */
+  void checkInputBuffers(ByteBuffer[] buffers) {
+    int validInputs = 0;
+
+    for (ByteBuffer buffer : buffers) {
+      if (buffer == null) {
+        continue;
+      }
+
+      if (buffer.remaining() != decodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, not of length " + decodeLength);
+      }
+      if (buffer.isDirect() != usingDirectBuffer) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, isDirect should be " + usingDirectBuffer);
+      }
+
+      validInputs++;
+    }
+
+    if (validInputs < decoder.getNumDataUnits()) {
+      throw new HadoopIllegalArgumentException(
+          "No enough valid inputs are provided, not recoverable");
+    }
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length and type, direct
+   * buffers or not.
+   * @param buffers the buffers to check
+   */
+  void checkOutputBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer : buffers) {
+      if (buffer == null) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer found, not allowing null");
+      }
+
+      if (buffer.remaining() != decodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, not of length " + decodeLength);
+      }
+      if (buffer.isDirect() != usingDirectBuffer) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, isDirect should be " + usingDirectBuffer);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java
new file mode 100644
index 0000000..7a10ac2
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A utility class that maintains encoding state during an encode call using
+ * ByteBuffer inputs.
+ */
+@InterfaceAudience.Private
+class ByteBufferEncodingState extends EncodingState {
+  ByteBuffer[] inputs;
+  ByteBuffer[] outputs;
+  boolean usingDirectBuffer;
+
+  ByteBufferEncodingState(RawErasureEncoder encoder,
+                          ByteBuffer[] inputs, ByteBuffer[] outputs) {
+    this.encoder = encoder;
+    ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
+    this.encodeLength = validInput.remaining();
+    this.usingDirectBuffer = validInput.isDirect();
+    this.inputs = inputs;
+    this.outputs = outputs;
+
+    checkParameters(inputs, outputs);
+    checkBuffers(inputs);
+    checkBuffers(outputs);
+  }
+
+  /**
+   * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays.
+   */
+  ByteArrayEncodingState convertToByteArrayState() {
+    int[] inputOffsets = new int[inputs.length];
+    int[] outputOffsets = new int[outputs.length];
+    byte[][] newInputs = new byte[inputs.length][];
+    byte[][] newOutputs = new byte[outputs.length][];
+
+    ByteBuffer buffer;
+    for (int i = 0; i < inputs.length; ++i) {
+      buffer = inputs[i];
+      inputOffsets[i] = buffer.arrayOffset() + buffer.position();
+      newInputs[i] = buffer.array();
+    }
+
+    for (int i = 0; i < outputs.length; ++i) {
+      buffer = outputs[i];
+      outputOffsets[i] = buffer.arrayOffset() + buffer.position();
+      newOutputs[i] = buffer.array();
+    }
+
+    ByteArrayEncodingState baeState = new ByteArrayEncodingState(encoder,
+        encodeLength, newInputs, inputOffsets, newOutputs, outputOffsets);
+    return baeState;
+  }
+
+  /**
+   * Check and ensure the buffers are of the desired length and type, direct
+   * buffers or not.
+   * @param buffers the buffers to check
+   */
+  void checkBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer : buffers) {
+      if (buffer == null) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer found, not allowing null");
+      }
+
+      if (buffer.remaining() != encodeLength) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, not of length " + encodeLength);
+      }
+      if (buffer.isDirect() != usingDirectBuffer) {
+        throw new HadoopIllegalArgumentException(
+            "Invalid buffer, isDirect should be " + usingDirectBuffer);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java
deleted file mode 100644
index e4d97ca..0000000
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.erasurecode.rawcoder;
-
-/**
- * Supported erasure coder options.
- */
-public enum CoderOption {
-  /* If direct buffer is preferred, for perf consideration */
-  PREFER_DIRECT_BUFFER(true),    // READ-ONLY
-  /**
-   * Allow changing input buffer content (not positions).
-   * Maybe better perf if allowed
-   */
-  ALLOW_CHANGE_INPUTS(false),    // READ-WRITE
-  /* Allow dump verbose debug info or not */
-  ALLOW_VERBOSE_DUMP(false);     // READ-WRITE
-
-  private boolean isReadOnly = false;
-
-  CoderOption(boolean isReadOnly) {
-    this.isReadOnly = isReadOnly;
-  }
-
-  public boolean isReadOnly() {
-    return isReadOnly;
-  }
-};

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
new file mode 100644
index 0000000..aceb3c6
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Helpful utilities for implementing some raw erasure coders.
+ */
+@InterfaceAudience.Private
+final class CoderUtil {
+
+  private CoderUtil() {
+    // No called
+  }
+
+  private static byte[] emptyChunk = new byte[4096];
+
+  /**
+   * Make sure to return an empty chunk buffer for the desired length.
+   * @param leastLength
+   * @return empty chunk of zero bytes
+   */
+  static byte[] getEmptyChunk(int leastLength) {
+    if (emptyChunk.length >= leastLength) {
+      return emptyChunk; // In most time
+    }
+
+    synchronized (CoderUtil.class) {
+      emptyChunk = new byte[leastLength];
+    }
+
+    return emptyChunk;
+  }
+
+  /**
+   * Ensure a buffer filled with ZERO bytes from current readable/writable
+   * position.
+   * @param buffer a buffer ready to read / write certain size bytes
+   * @return the buffer itself, with ZERO bytes written, the position and limit
+   *         are not changed after the call
+   */
+  static ByteBuffer resetBuffer(ByteBuffer buffer, int len) {
+    int pos = buffer.position();
+    buffer.put(getEmptyChunk(len), 0, len);
+    buffer.position(pos);
+
+    return buffer;
+  }
+
+  /**
+   * Ensure the buffer (either input or output) ready to read or write with 
ZERO
+   * bytes fully in specified length of len.
+   * @param buffer bytes array buffer
+   * @return the buffer itself
+   */
+  static byte[] resetBuffer(byte[] buffer, int offset, int len) {
+    byte[] empty = getEmptyChunk(len);
+    System.arraycopy(empty, 0, buffer, offset, len);
+
+    return buffer;
+  }
+
+  /**
+   * Initialize the output buffers with ZERO bytes.
+   * @param buffers
+   * @param dataLen
+   */
+  static void resetOutputBuffers(ByteBuffer[] buffers, int dataLen) {
+    for (ByteBuffer buffer : buffers) {
+      resetBuffer(buffer, dataLen);
+    }
+  }
+
+  /**
+   * Initialize the output buffers with ZERO bytes.
+   * @param buffers
+   * @param dataLen
+   */
+  static void resetOutputBuffers(byte[][] buffers, int[] offsets,
+                                 int dataLen) {
+    for (int i = 0; i < buffers.length; i++) {
+      resetBuffer(buffers[i], offsets[i], dataLen);
+    }
+  }
+
+  /**
+   * Convert an array of this chunks to an array of ByteBuffers
+   * @param chunks chunks to convertToByteArrayState into buffers
+   * @return an array of ByteBuffers
+   */
+  static ByteBuffer[] toBuffers(ECChunk[] chunks) {
+    ByteBuffer[] buffers = new ByteBuffer[chunks.length];
+
+    ECChunk chunk;
+    for (int i = 0; i < chunks.length; i++) {
+      chunk = chunks[i];
+      if (chunk == null) {
+        buffers[i] = null;
+      } else {
+        buffers[i] = chunk.getBuffer();
+      }
+    }
+
+    return buffers;
+  }
+
+  /**
+   * Clone an input bytes array as direct ByteBuffer.
+   * @param input
+   * @param len
+   * @param offset
+   * @return direct ByteBuffer
+   */
+  static ByteBuffer cloneAsDirectByteBuffer(byte[] input, int offset, int len) 
{
+    if (input == null) { // an input can be null, if erased or not to read
+      return null;
+    }
+
+    ByteBuffer directBuffer = ByteBuffer.allocateDirect(len);
+    directBuffer.put(input, offset, len);
+    directBuffer.flip();
+    return directBuffer;
+  }
+
+  /**
+   * Get indexes array for items marked as null, either erased or
+   * not to read.
+   * @return indexes array
+   */
+  static <T> int[] getNullIndexes(T[] inputs) {
+    int[] nullIndexes = new int[inputs.length];
+    int idx = 0;
+    for (int i = 0; i < inputs.length; i++) {
+      if (inputs[i] == null) {
+        nullIndexes[idx++] = i;
+      }
+    }
+
+    return Arrays.copyOf(nullIndexes, idx);
+  }
+
+  /**
+   * Find the valid input from all the inputs.
+   * @param inputs input buffers to look for valid input
+   * @return the first valid input
+   */
+  static <T> T findFirstValidInput(T[] inputs) {
+    if (inputs.length > 0 && inputs[0] != null) {
+      return inputs[0];
+    }
+
+    for (T input : inputs) {
+      if (input != null) {
+        return input;
+      }
+    }
+
+    throw new HadoopIllegalArgumentException(
+        "Invalid inputs are found, all being null");
+  }
+
+  /**
+   * Picking up indexes of valid inputs.
+   * @param inputs decoding input buffers
+   * @param <T>
+   */
+  static <T> int[] getValidIndexes(T[] inputs) {
+    int[] validIndexes = new int[inputs.length];
+    int idx = 0;
+    for (int i = 0; i < inputs.length; i++) {
+      if (inputs[i] != null) {
+        validIndexes[idx++] = i;
+      }
+    }
+
+    return Arrays.copyOf(validIndexes, idx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java
new file mode 100644
index 0000000..4b693a4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A utility class that maintains decoding state during a decode call.
+ */
+@InterfaceAudience.Private
+class DecodingState {
+  RawErasureDecoder decoder;
+  int decodeLength;
+
+  /**
+   * Check and validate decoding parameters, throw exception accordingly. The
+   * checking assumes it's a MDS code. Other code  can override this.
+   * @param inputs input buffers to check
+   * @param erasedIndexes indexes of erased units in the inputs array
+   * @param outputs output buffers to check
+   */
+  <T> void checkParameters(T[] inputs, int[] erasedIndexes,
+                           T[] outputs) {
+    if (inputs.length != decoder.getNumParityUnits() +
+        decoder.getNumDataUnits()) {
+      throw new IllegalArgumentException("Invalid inputs length");
+    }
+
+    if (erasedIndexes.length != outputs.length) {
+      throw new HadoopIllegalArgumentException(
+          "erasedIndexes and outputs mismatch in length");
+    }
+
+    if (erasedIndexes.length > decoder.getNumParityUnits()) {
+      throw new HadoopIllegalArgumentException(
+          "Too many erased, not recoverable");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java
index 25dfa57..256a725 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.io.erasurecode.rawcoder;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.nio.ByteBuffer;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 
 /**
  * A dummy raw decoder that does no real computation.
@@ -28,20 +27,19 @@ import java.nio.ByteBuffer;
  * instead of codec, and is intended for test only.
  */
 @InterfaceAudience.Private
-public class DummyRawDecoder extends AbstractRawErasureDecoder {
-  public DummyRawDecoder(int numDataUnits, int numParityUnits) {
-    super(numDataUnits, numParityUnits);
+public class DummyRawDecoder extends RawErasureDecoder {
+
+  public DummyRawDecoder(ErasureCoderOptions coderOptions) {
+    super(coderOptions);
   }
 
   @Override
-  protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
-      ByteBuffer[] outputs) {
+  protected void doDecode(ByteBufferDecodingState decodingState) {
     // Nothing to do. Output buffers have already been reset
   }
 
   @Override
-  protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen,
-      int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) {
+  protected void doDecode(ByteArrayDecodingState decodingState) {
     // Nothing to do. Output buffers have already been reset
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java
index 33e026d..558e350 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.io.erasurecode.rawcoder;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.nio.ByteBuffer;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 
 /**
  * A dummy raw encoder that does no real computation.
@@ -28,19 +27,19 @@ import java.nio.ByteBuffer;
  * instead of codec, and is intended for test only.
  */
 @InterfaceAudience.Private
-public class DummyRawEncoder extends AbstractRawErasureEncoder {
-  public DummyRawEncoder(int numDataUnits, int numParityUnits) {
-    super(numDataUnits, numParityUnits);
+public class DummyRawEncoder extends RawErasureEncoder {
+
+  public DummyRawEncoder(ErasureCoderOptions coderOptions) {
+    super(coderOptions);
   }
 
   @Override
-  protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
+  protected void doEncode(ByteArrayEncodingState encodingState) {
     // Nothing to do. Output buffers have already been reset
   }
 
   @Override
-  protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen,
-      byte[][] outputs, int[] outputOffsets) {
+  protected void doEncode(ByteBufferEncodingState encodingState) {
     // Nothing to do. Output buffers have already been reset
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java
index 73457c2..31ba4ef 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java
@@ -18,19 +18,21 @@
 package org.apache.hadoop.io.erasurecode.rawcoder;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
 
 /**
  * A raw erasure coder factory for dummy raw coders.
  */
 @InterfaceAudience.Private
 public class DummyRawErasureCoderFactory implements RawErasureCoderFactory {
+
   @Override
-  public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) 
{
-    return new DummyRawEncoder(numDataUnits, numParityUnits);
+  public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
+    return new DummyRawEncoder(coderOptions);
   }
 
   @Override
-  public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) 
{
-    return new DummyRawDecoder(numDataUnits, numParityUnits);
+  public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
+    return new DummyRawDecoder(coderOptions);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java
new file mode 100644
index 0000000..a8946d2
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A utility class that maintains encoding state during an encode call.
+ */
+@InterfaceAudience.Private
+abstract class EncodingState {
+  RawErasureEncoder encoder;
+  int encodeLength;
+
+  /**
+   * Check and validate decoding parameters, throw exception accordingly.
+   * @param inputs input buffers to check
+   * @param outputs output buffers to check
+   */
+  <T> void checkParameters(T[] inputs, T[] outputs) {
+    if (inputs.length != encoder.getNumDataUnits()) {
+      throw new HadoopIllegalArgumentException("Invalid inputs length");
+    }
+    if (outputs.length != encoder.getNumParityUnits()) {
+      throw new HadoopIllegalArgumentException("Invalid outputs length");
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to