Repository: flume Updated Branches: refs/heads/trunk c570a51b3 -> 1e69fc7c2
FLUME-2620. File Channel to support empty values in headers Flume user guide does not specify whether a value in event header could be null or not. Given an external system generating events which header values can be null and a user configures Flume with Memory Channel then he will have no trouble. Later on when the user changes Memory Channel to File Channel then Flume will fail with NPE. It is because FC is serializing events with protocol buffer and header values are defined as required in the proto file. In this patch I have changed the value field to optional. However protocol buffer does not have a notation for null and setting a field to null raises NPE again. Added a null check before serialization to prevent this. There is on caveat: When an optional field is not set, at deserialization it will be set to a default value: in this case it will be empty string. Reviewers: Miklos Csanady (Marcell Hegedus via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e69fc7c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e69fc7c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e69fc7c Branch: refs/heads/trunk Commit: 1e69fc7c29f104a2117a62de11cba9b2a2c740e1 Parents: c570a51 Author: Marcell Hegedus <[email protected]> Authored: Wed Jul 19 14:27:56 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Wed Jul 19 14:27:56 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Put.java | 7 +++- .../flume/channel/file/proto/ProtosFactory.java | 40 ++++++++------------ .../src/main/proto/filechannel.proto | 2 +- .../flume/channel/file/TestFileChannel.java | 24 ++++++++++++ .../apache/flume/channel/TestMemoryChannel.java | 22 +++++++++++ 5 files changed, 68 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java index 0a70a24..c5ea290 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java @@ -82,8 +82,11 @@ class Put extends TransactionEventRecord { for (String key : headers.keySet()) { String value = headers.get(key); headerBuilder.clear(); - eventBuilder.addHeaders(headerBuilder.setKey(key) - .setValue(value).build()); + headerBuilder.setKey(key); + if (value != null) { + headerBuilder.setValue(value); + } + eventBuilder.addHeaders(headerBuilder.build()); } } eventBuilder.setBody(ByteString.copyFrom(event.getBody())); http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java index 50492cc..202f33d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java @@ -6831,17 +6831,17 @@ public final class ProtosFactory { com.google.protobuf.ByteString getKeyBytes(); - // required string value = 2; + // optional string value = 2; /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ boolean hasValue(); /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ java.lang.String getValue(); /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ com.google.protobuf.ByteString getValueBytes(); @@ -6990,17 +6990,17 @@ public final class ProtosFactory { } } - // required string value = 2; + // optional string value = 2; public static final int VALUE_FIELD_NUMBER = 2; private java.lang.Object value_; /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public java.lang.String getValue() { java.lang.Object ref = value_; @@ -7017,7 +7017,7 @@ public final class ProtosFactory { } } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public com.google.protobuf.ByteString getValueBytes() { @@ -7046,10 +7046,6 @@ public final class ProtosFactory { memoizedIsInitialized = 0; return false; } - if (!hasValue()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -7271,10 +7267,6 @@ public final class ProtosFactory { return false; } - if (!hasValue()) { - - return false; - } return true; } @@ -7371,16 +7363,16 @@ public final class ProtosFactory { return this; } - // required string value = 2; + // optional string value = 2; private java.lang.Object value_ = ""; /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public java.lang.String getValue() { java.lang.Object ref = value_; @@ -7394,7 +7386,7 @@ public final class ProtosFactory { } } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public com.google.protobuf.ByteString getValueBytes() { @@ -7410,7 +7402,7 @@ public final class ProtosFactory { } } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public Builder setValue( java.lang.String value) { @@ -7423,7 +7415,7 @@ public final class ProtosFactory { return this; } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public Builder clearValue() { bitField0_ = (bitField0_ & ~0x00000002); @@ -7432,7 +7424,7 @@ public final class ProtosFactory { return this; } /** - * <code>required string value = 2;</code> + * <code>optional string value = 2;</code> */ public Builder setValueBytes( com.google.protobuf.ByteString value) { @@ -7546,7 +7538,7 @@ public final class ProtosFactory { "ansactionEventFooter\">\n\nFlumeEvent\022\"\n\007he" + "aders\030\001 \003(\0132\021.FlumeEventHeader\022\014\n\004body\030\002", " \002(\014\".\n\020FlumeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n" + - "\005value\030\002 \002(\tB4\n#org.apache.flume.channel" + + "\005value\030\002 \001(\tB4\n#org.apache.flume.channel" + ".file.protoB\rProtosFactory" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto index 25520e8..929b41d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto +++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto @@ -83,5 +83,5 @@ message FlumeEvent { message FlumeEventHeader { required string key = 1; - required string value = 2; + optional string value = 2; } http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 8efe991..a3d27f7 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -69,6 +70,7 @@ public class TestFileChannel extends TestFileChannelBase { private static final Logger LOG = LoggerFactory .getLogger(TestFileChannel.class); + public static final String TEST_KEY = "test_key"; @Before public void setup() throws Exception { @@ -234,6 +236,28 @@ public class TestFileChannel extends TestFileChannelBase { } @Test + public void testPutConvertsNullValueToEmptyStrInHeader() throws Exception { + channel.start(); + + Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8), + Collections.<String, String>singletonMap(TEST_KEY, null)); + + Transaction txPut = channel.getTransaction(); + txPut.begin(); + channel.put(event); + txPut.commit(); + txPut.close(); + + Transaction txTake = channel.getTransaction(); + txTake.begin(); + Event eventTaken = channel.take(); + Assert.assertArrayEquals(event.getBody(), eventTaken.getBody()); + Assert.assertEquals("", eventTaken.getHeaders().get(TEST_KEY)); + txTake.commit(); + txTake.close(); + } + + @Test public void testCommitAfterNoPutTake() throws Exception { channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index 344bb58..f7e43eb 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -19,6 +19,7 @@ package org.apache.flume.channel; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -32,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; @@ -72,6 +74,26 @@ public class TestMemoryChannel { } @Test + public void testPutAcceptsNullValueInHeader() { + Configurables.configure(channel, new Context()); + + Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8), + Collections.<String, String>singletonMap("test_key", null)); + + Transaction txPut = channel.getTransaction(); + txPut.begin(); + channel.put(event); + txPut.commit(); + txPut.close(); + + Transaction txTake = channel.getTransaction(); + txTake.begin(); + Event eventTaken = channel.take(); + Assert.assertEquals(event, eventTaken); + txTake.commit(); + } + + @Test public void testChannelResize() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>();
