Updated Branches:
  refs/heads/flume-1.4 ee2f6c0ba -> 9691e3639

FLUME-1285 - FileChannel has a dependency on Hadoop IO classes

(Israel Ekpo and Christopher Nagy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9691e363
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9691e363
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9691e363

Branch: refs/heads/flume-1.4
Commit: 9691e3639fd000fdb392f33ab6649e57e0ffc424
Parents: ee2f6c0
Author: Brock Noland <[email protected]>
Authored: Sun Jun 23 15:16:36 2013 -0500
Committer: Brock Noland <[email protected]>
Committed: Sun Jun 23 15:16:49 2013 -0500

----------------------------------------------------------------------
 flume-ng-channels/flume-file-channel/pom.xml    |  24 ---
 .../apache/flume/channel/file/FlumeEvent.java   | 126 ++++++++++++----
 .../channel/file/TransactionEventRecord.java    |   1 -
 .../org/apache/flume/channel/file/Writable.java |  47 ++++++
 .../flume/channel/file/WritableUtils.java       | 150 +++++++++++++++++++
 .../apache/flume/channel/file/TestUtils.java    |   1 -
 6 files changed, 294 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml 
b/flume-ng-channels/flume-file-channel/pom.xml
index 2408447..0f44c68 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -97,12 +97,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>${hadoop.common.artifact.id}</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <scope>compile</scope>
@@ -112,24 +106,6 @@
 
   <profiles>
 
-    <profile>
-      <id>hadoop-2</id>
-      <activation>
-        <property>
-          <name>hadoop.profile</name>
-          <value>2</value>
-        </property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-auth</artifactId>
-          <optional>true</optional>
-        </dependency>
-      </dependencies>
-    </profile>
-
-
    <profile>
      <id>compile-proto</id>
      <build>

http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
index c447335..53c1251 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
@@ -21,20 +21,44 @@ package org.apache.flume.channel.file;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CodingErrorAction;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flume.Event;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
 
 /**
  * Persistable wrapper for Event
  */
 class FlumeEvent implements Event, Writable {
 
+  private static final byte EVENT_MAP_TEXT_WRITABLE_ID = 
Byte.valueOf(Integer.valueOf(-116).byteValue());
+
+  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
+      new ThreadLocal<CharsetEncoder>() {
+    @Override
+    protected CharsetEncoder initialValue() {
+      return Charset.forName("UTF-8").newEncoder().
+          onMalformedInput(CodingErrorAction.REPLACE).
+          onUnmappableCharacter(CodingErrorAction.REPLACE);
+    }
+  };
+
+  private static ThreadLocal<CharsetDecoder> DECODER_FACTORY =
+      new ThreadLocal<CharsetDecoder>() {
+    @Override
+    protected CharsetDecoder initialValue() {
+      return Charset.forName("UTF-8").newDecoder().
+          onMalformedInput(CodingErrorAction.REPLACE).
+          onUnmappableCharacter(CodingErrorAction.REPLACE);
+    }
+  };
+
   private Map<String, String> headers;
   private byte[] body;
 
@@ -69,8 +93,34 @@ class FlumeEvent implements Event, Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    MapWritable map = toMapWritable(getHeaders());
-    map.write(out);
+
+    out.writeByte(0);
+
+    Map<String,String> writeHeaders = getHeaders();
+    if (null != writeHeaders) {
+      out.writeInt(headers.size());
+
+      CharsetEncoder encoder = ENCODER_FACTORY.get();
+
+      for (String key : headers.keySet()) {
+        out.writeByte(EVENT_MAP_TEXT_WRITABLE_ID);
+        ByteBuffer keyBytes = 
encoder.encode(CharBuffer.wrap(key.toCharArray()));
+        int keyLength = keyBytes.limit();
+        WritableUtils.writeVInt(out, keyLength);
+        out.write(keyBytes.array(), 0, keyLength);
+
+        String value = headers.get(key);
+        out.write(EVENT_MAP_TEXT_WRITABLE_ID);
+        ByteBuffer valueBytes = 
encoder.encode(CharBuffer.wrap(value.toCharArray()));
+        int valueLength = valueBytes.limit();
+        WritableUtils.writeVInt(out, valueLength );
+        out.write(valueBytes.array(), 0, valueLength);
+      }
+    }
+    else {
+      out.writeInt( 0 );
+    }
+
     byte[] body = getBody();
     if(body == null) {
       out.writeInt(-1);
@@ -83,9 +133,45 @@ class FlumeEvent implements Event, Writable {
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    MapWritable map = new MapWritable();
-    map.readFields(in);
-    setHeaders(fromMapWritable(map));
+
+    // newClasses from AbstractMapWritable in Hadoop Common
+    byte newClasses = in.readByte();
+
+    // skip over newClasses since only Text is used
+    for (byte i = 0; i < newClasses; i++) {
+      in.readByte();
+      in.readUTF();
+    }
+
+    Map<String,String> newHeaders = new HashMap<String,String>();
+
+    int numEntries = in.readInt();
+
+    CharsetDecoder decoder = DECODER_FACTORY.get();
+
+    for (int i = 0; i < numEntries; i++) {
+
+      byte keyClassId = in.readByte();
+      assert (keyClassId == EVENT_MAP_TEXT_WRITABLE_ID);
+      int keyLength = WritableUtils.readVInt(in);
+      byte[] keyBytes = new byte[ keyLength ];
+
+      in.readFully( keyBytes, 0, keyLength );
+      String key = decoder.decode( ByteBuffer.wrap(keyBytes) ).toString();
+
+      byte valueClassId = in.readByte();
+      assert (valueClassId == EVENT_MAP_TEXT_WRITABLE_ID);
+      int valueLength = WritableUtils.readVInt(in);
+      byte[] valueBytes = new byte[ valueLength ];
+
+      in.readFully(valueBytes, 0, valueLength);
+      String value = decoder.decode(ByteBuffer.wrap(valueBytes)).toString();
+
+      newHeaders.put(key,  value);
+    }
+
+    setHeaders(newHeaders);
+
     byte[] body = null;
     int bodyLength = in.readInt();
     if(bodyLength != -1) {
@@ -94,27 +180,9 @@ class FlumeEvent implements Event, Writable {
     }
     setBody(body);
   }
-  private MapWritable toMapWritable(Map<String, String> map) {
-    MapWritable result = new MapWritable();
-    if(map != null) {
-      for(Map.Entry<String, String> entry : map.entrySet()) {
-        result.put(new Text(entry.getKey()),new Text(entry.getValue()));
-      }
-    }
-    return result;
-  }
-  private Map<String, String> fromMapWritable(MapWritable map) {
-    Map<String, String> result = Maps.newHashMap();
-    if(map != null) {
-      for(Map.Entry<Writable, Writable> entry : map.entrySet()) {
-        result.put(entry.getKey().toString(),entry.getValue().toString());
-      }
-    }
-    return result;
-  }
   static FlumeEvent from(DataInput in) throws IOException {
     FlumeEvent event = new FlumeEvent();
     event.readFields(in);
     return event;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index dda9b3f..ea7f00c 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -32,7 +32,6 @@ import java.nio.ByteBuffer;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java
new file mode 100644
index 0000000..5e0ab6d
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Defines methods for reading from or writing to streams <p>
+ *
+ * Based on org.apache.hadoop.io.Writable
+ */
+interface Writable {
+  /**
+   * Serialize the fields of this object to <code>out</code>
+   *
+   * @param out <code>DataOutput</code> to serialize this object into.
+   * @throws IOException
+   */
+  public void write(DataOutput out) throws IOException;
+
+  /**
+   * Deserialize the fields of this object from <code>in</code>
+   *
+   * @param in <code>DataInput</code> to deserialize this object from.
+   * @throws IOException
+   */
+  public void readFields(DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
new file mode 100644
index 0000000..69072db
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Util methods copied from org.apache.hadoop.io.WritableUtils.
+ */
+class WritableUtils {
+
+  /**
+   * Serializes an integer to a binary stream with zero-compressed encoding.
+   * For -120 <= i <= 127, only one byte is used with the actual value.
+   * For other values of i, the first byte value indicates whether the
+   * integer is positive or negative, and the number of bytes that follow.
+   * If the first byte value v is between -121 and -124, the following integer
+   * is positive, with number of bytes that follow are -(v+120).
+   * If the first byte value v is between -125 and -128, the following integer
+   * is negative, with number of bytes that follow are -(v+124). Bytes are
+   * stored in the high-non-zero-byte-first order.
+   *
+   * @param stream Binary output stream
+   * @param i Integer to be serialized
+   * @throws java.io.IOException
+   */
+  public static void writeVInt(DataOutput stream, int i) throws IOException {
+    writeVLong(stream, i);
+  }
+
+  /**
+   * Serializes a long to a binary stream with zero-compressed encoding.
+   * For -112 <= i <= 127, only one byte is used with the actual value.
+   * For other values of i, the first byte value indicates whether the
+   * long is positive or negative, and the number of bytes that follow.
+   * If the first byte value v is between -113 and -120, the following long
+   * is positive, with number of bytes that follow are -(v+112).
+   * If the first byte value v is between -121 and -128, the following long
+   * is negative, with number of bytes that follow are -(v+120). Bytes are
+   * stored in the high-non-zero-byte-first order.
+   *
+   * @param stream Binary output stream
+   * @param i Long to be serialized
+   * @throws java.io.IOException
+   */
+  public static void writeVLong(DataOutput stream, long i) throws IOException {
+    if (i >= -112 && i <= 127) {
+      stream.writeByte((byte)i);
+      return;
+    }
+
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+    len--;
+    }
+
+    stream.writeByte((byte)len);
+
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      stream.writeByte((byte)((i & mask) >> shiftbits));
+    }
+  }
+
+  /**
+   * Reads a zero-compressed encoded long from input stream and returns it.
+   * @param stream Binary input stream
+   * @throws java.io.IOException
+   * @return deserialized long from stream.
+   */
+  public static long readVLong(DataInput stream) throws IOException {
+    byte firstByte = stream.readByte();
+    int len = decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = stream.readByte();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+  /**
+   * Reads a zero-compressed encoded integer from input stream and returns it.
+   * @param stream Binary input stream
+   * @throws java.io.IOException
+   * @return deserialized integer from stream.
+   */
+  public static int readVInt(DataInput stream) throws IOException {
+    long n = readVLong(stream);
+    if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
+      throw new IOException("value too long to fit in integer");
+    }
+    return (int)n;
+  }
+
+  /**
+   * Given the first byte of a vint/vlong, determine the sign
+   * @param value the first byte
+   * @return is the value negative
+   */
+  public static boolean isNegativeVInt(byte value) {
+    return value < -120 || (value >= -112 && value < 0);
+  }
+
+  /**
+   * Parse the first byte of a vint/vlong to determine the number of bytes
+   * @param value the first byte of the vint/vlong
+   * @return the total number of bytes (1 to 9)
+   */
+  public static int decodeVIntSize(byte value) {
+    if (value >= -112) {
+      return 1;
+    } else if (value < -120) {
+      return -119 - value;
+    }
+    return -111 - value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9691e363/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 75e118e..0fb9bc2 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -43,7 +43,6 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 
 import com.google.common.base.Charsets;

Reply via email to