Updated Branches: refs/heads/trunk b8c4b003a -> 97170aebf
FLUME-1285 - FileChannel has a dependency on Hadoop IO classes (Israel Ekpo and Christopher Nagy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/97170aeb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/97170aeb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/97170aeb Branch: refs/heads/trunk Commit: 97170aebf4240e1f7c628650b621f3f4c3178dc0 Parents: b8c4b00 Author: Brock Noland <[email protected]> Authored: Sun Jun 23 15:16:36 2013 -0500 Committer: Brock Noland <[email protected]> Committed: Sun Jun 23 15:16:36 2013 -0500 ---------------------------------------------------------------------- flume-ng-channels/flume-file-channel/pom.xml | 24 --- .../apache/flume/channel/file/FlumeEvent.java | 126 ++++++++++++---- .../channel/file/TransactionEventRecord.java | 1 - .../org/apache/flume/channel/file/Writable.java | 47 ++++++ .../flume/channel/file/WritableUtils.java | 150 +++++++++++++++++++ .../apache/flume/channel/file/TestUtils.java | 1 - 6 files changed, 294 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index 2408447..0f44c68 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -97,12 +97,6 @@ </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>${hadoop.common.artifact.id}</artifactId> - <optional>true</optional> - </dependency> - - <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <scope>compile</scope> @@ -112,24 +106,6 @@ <profiles> - <profile> - <id>hadoop-2</id> - <activation> - <property> - <name>hadoop.profile</name> - <value>2</value> - </property> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - <optional>true</optional> - </dependency> - </dependencies> - </profile> - - <profile> <id>compile-proto</id> <build> http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java index c447335..53c1251 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java @@ -21,20 +21,44 @@ package org.apache.flume.channel.file; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; +import java.util.HashMap; import java.util.Map; import org.apache.flume.Event; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Maps; /** * Persistable wrapper for Event */ class FlumeEvent implements Event, Writable { + private static final byte EVENT_MAP_TEXT_WRITABLE_ID = Byte.valueOf(Integer.valueOf(-116).byteValue()); + + private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = + new ThreadLocal<CharsetEncoder>() { + @Override + protected CharsetEncoder initialValue() { + return Charset.forName("UTF-8").newEncoder(). + onMalformedInput(CodingErrorAction.REPLACE). + onUnmappableCharacter(CodingErrorAction.REPLACE); + } + }; + + private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = + new ThreadLocal<CharsetDecoder>() { + @Override + protected CharsetDecoder initialValue() { + return Charset.forName("UTF-8").newDecoder(). + onMalformedInput(CodingErrorAction.REPLACE). + onUnmappableCharacter(CodingErrorAction.REPLACE); + } + }; + private Map<String, String> headers; private byte[] body; @@ -69,8 +93,34 @@ class FlumeEvent implements Event, Writable { @Override public void write(DataOutput out) throws IOException { - MapWritable map = toMapWritable(getHeaders()); - map.write(out); + + out.writeByte(0); + + Map<String,String> writeHeaders = getHeaders(); + if (null != writeHeaders) { + out.writeInt(headers.size()); + + CharsetEncoder encoder = ENCODER_FACTORY.get(); + + for (String key : headers.keySet()) { + out.writeByte(EVENT_MAP_TEXT_WRITABLE_ID); + ByteBuffer keyBytes = encoder.encode(CharBuffer.wrap(key.toCharArray())); + int keyLength = keyBytes.limit(); + WritableUtils.writeVInt(out, keyLength); + out.write(keyBytes.array(), 0, keyLength); + + String value = headers.get(key); + out.write(EVENT_MAP_TEXT_WRITABLE_ID); + ByteBuffer valueBytes = encoder.encode(CharBuffer.wrap(value.toCharArray())); + int valueLength = valueBytes.limit(); + WritableUtils.writeVInt(out, valueLength ); + out.write(valueBytes.array(), 0, valueLength); + } + } + else { + out.writeInt( 0 ); + } + byte[] body = getBody(); if(body == null) { out.writeInt(-1); @@ -83,9 +133,45 @@ class FlumeEvent implements Event, Writable { @Override public void readFields(DataInput in) throws IOException { - MapWritable map = new MapWritable(); - map.readFields(in); - setHeaders(fromMapWritable(map)); + + // newClasses from AbstractMapWritable in Hadoop Common + byte newClasses = in.readByte(); + + // skip over newClasses since only Text is used + for (byte i = 0; i < newClasses; i++) { + in.readByte(); + in.readUTF(); + } + + Map<String,String> newHeaders = new HashMap<String,String>(); + + int numEntries = in.readInt(); + + CharsetDecoder decoder = DECODER_FACTORY.get(); + + for (int i = 0; i < numEntries; i++) { + + byte keyClassId = in.readByte(); + assert (keyClassId == EVENT_MAP_TEXT_WRITABLE_ID); + int keyLength = WritableUtils.readVInt(in); + byte[] keyBytes = new byte[ keyLength ]; + + in.readFully( keyBytes, 0, keyLength ); + String key = decoder.decode( ByteBuffer.wrap(keyBytes) ).toString(); + + byte valueClassId = in.readByte(); + assert (valueClassId == EVENT_MAP_TEXT_WRITABLE_ID); + int valueLength = WritableUtils.readVInt(in); + byte[] valueBytes = new byte[ valueLength ]; + + in.readFully(valueBytes, 0, valueLength); + String value = decoder.decode(ByteBuffer.wrap(valueBytes)).toString(); + + newHeaders.put(key, value); + } + + setHeaders(newHeaders); + byte[] body = null; int bodyLength = in.readInt(); if(bodyLength != -1) { @@ -94,27 +180,9 @@ class FlumeEvent implements Event, Writable { } setBody(body); } - private MapWritable toMapWritable(Map<String, String> map) { - MapWritable result = new MapWritable(); - if(map != null) { - for(Map.Entry<String, String> entry : map.entrySet()) { - result.put(new Text(entry.getKey()),new Text(entry.getValue())); - } - } - return result; - } - private Map<String, String> fromMapWritable(MapWritable map) { - Map<String, String> result = Maps.newHashMap(); - if(map != null) { - for(Map.Entry<Writable, Writable> entry : map.entrySet()) { - result.put(entry.getKey().toString(),entry.getValue().toString()); - } - } - return result; - } static FlumeEvent from(DataInput in) throws IOException { FlumeEvent event = new FlumeEvent(); event.readFields(in); return event; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java index dda9b3f..ea7f00c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java @@ -32,7 +32,6 @@ import java.nio.ByteBuffer; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.proto.ProtosFactory; -import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java new file mode 100644 index 0000000..5e0ab6d --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Writable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.channel.file; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Defines methods for reading from or writing to streams <p> + * + * Based on org.apache.hadoop.io.Writable + */ +interface Writable { + /** + * Serialize the fields of this object to <code>out</code> + * + * @param out <code>DataOutput</code> to serialize this object into. + * @throws IOException + */ + public void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from <code>in</code> + * + * @param in <code>DataInput</code> to deserialize this object from. + * @throws IOException + */ + public void readFields(DataInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java new file mode 100644 index 0000000..69072db --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.channel.file; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Util methods copied from org.apache.hadoop.io.WritableUtils. + */ +class WritableUtils { + + /** + * Serializes an integer to a binary stream with zero-compressed encoding. + * For -120 <= i <= 127, only one byte is used with the actual value. + * For other values of i, the first byte value indicates whether the + * integer is positive or negative, and the number of bytes that follow. + * If the first byte value v is between -121 and -124, the following integer + * is positive, with number of bytes that follow are -(v+120). + * If the first byte value v is between -125 and -128, the following integer + * is negative, with number of bytes that follow are -(v+124). Bytes are + * stored in the high-non-zero-byte-first order. + * + * @param stream Binary output stream + * @param i Integer to be serialized + * @throws java.io.IOException + */ + public static void writeVInt(DataOutput stream, int i) throws IOException { + writeVLong(stream, i); + } + + /** + * Serializes a long to a binary stream with zero-compressed encoding. + * For -112 <= i <= 127, only one byte is used with the actual value. + * For other values of i, the first byte value indicates whether the + * long is positive or negative, and the number of bytes that follow. + * If the first byte value v is between -113 and -120, the following long + * is positive, with number of bytes that follow are -(v+112). + * If the first byte value v is between -121 and -128, the following long + * is negative, with number of bytes that follow are -(v+120). Bytes are + * stored in the high-non-zero-byte-first order. + * + * @param stream Binary output stream + * @param i Long to be serialized + * @throws java.io.IOException + */ + public static void writeVLong(DataOutput stream, long i) throws IOException { + if (i >= -112 && i <= 127) { + stream.writeByte((byte)i); + return; + } + + int len = -112; + if (i < 0) { + i ^= -1L; // take one's complement' + len = -120; + } + + long tmp = i; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + stream.writeByte((byte)len); + + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + long mask = 0xFFL << shiftbits; + stream.writeByte((byte)((i & mask) >> shiftbits)); + } + } + + /** + * Reads a zero-compressed encoded long from input stream and returns it. + * @param stream Binary input stream + * @throws java.io.IOException + * @return deserialized long from stream. + */ + public static long readVLong(DataInput stream) throws IOException { + byte firstByte = stream.readByte(); + int len = decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len-1; idx++) { + byte b = stream.readByte(); + i = i << 8; + i = i | (b & 0xFF); + } + return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + + /** + * Reads a zero-compressed encoded integer from input stream and returns it. + * @param stream Binary input stream + * @throws java.io.IOException + * @return deserialized integer from stream. + */ + public static int readVInt(DataInput stream) throws IOException { + long n = readVLong(stream); + if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) { + throw new IOException("value too long to fit in integer"); + } + return (int)n; + } + + /** + * Given the first byte of a vint/vlong, determine the sign + * @param value the first byte + * @return is the value negative + */ + public static boolean isNegativeVInt(byte value) { + return value < -120 || (value >= -112 && value < 0); + } + + /** + * Parse the first byte of a vint/vlong to determine the number of bytes + * @param value the first byte of the vint/vlong + * @return the total number of bytes (1 to 9) + */ + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/97170aeb/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 75e118e..0fb9bc2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -43,7 +43,6 @@ import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; -import org.apache.hadoop.io.Writable; import org.junit.Assert; import com.google.common.base.Charsets;
