Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 41b09861a -> 92d01e116
PHOENIX-2116: phoenix-flume - Sink/Serializer should be extendable Removes 'final' from the PhoenixSink class declaration, and allows passing a custom event serializer class instead of a built-in. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92d01e11 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92d01e11 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92d01e11 Branch: refs/heads/4.x-HBase-1.0 Commit: 92d01e11622ef77a68d10747a587c83d1c8a5a31 Parents: 41b0986 Author: Josh Mahonin <jmaho...@interset.com> Authored: Thu Sep 3 11:58:06 2015 -0400 Committer: Josh Mahonin <jmaho...@interset.com> Committed: Thu Sep 3 12:02:13 2015 -0400 ---------------------------------------------------------------------- phoenix-flume/pom.xml | 4 + .../org/apache/phoenix/flume/PhoenixSinkIT.java | 99 +++++++++++++++++--- .../flume/serializer/CustomSerializer.java | 46 +++++++++ .../phoenix/flume/sink/NullPhoenixSink.java | 21 +++++ .../apache/phoenix/flume/sink/PhoenixSink.java | 24 +++-- 5 files changed, 171 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml index 88741f5..098053d 100644 --- a/phoenix-flume/pom.xml +++ b/phoenix-flume/pom.xml @@ -57,6 +57,10 @@ <artifactId>junit</artifactId> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> <groupId>com.github.stephenc.high-scale-lib</groupId> <artifactId>high-scale-lib</artifactId> <version>1.1.1</version> http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java index 7d4f7af..a59a356 100644 --- a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java +++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java @@ -17,25 +17,36 @@ */ package org.apache.phoenix.flume; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Sink; -import org.apache.flume.SinkFactory; +import org.apache.flume.*; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.sink.DefaultSinkFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.flume.serializer.EventSerializers; +import org.apache.phoenix.flume.serializer.CustomSerializer; +import org.apache.phoenix.flume.sink.NullPhoenixSink; import org.apache.phoenix.flume.sink.PhoenixSink; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Assert; import org.junit.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.Properties; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { @@ -79,7 +90,7 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { Configurables.configure(sink, sinkContext); } - @Test(expected=IllegalArgumentException.class) + @Test(expected=RuntimeException.class) public void testInvalidConfigurationOfSerializer () { sinkContext = new Context (); @@ -98,13 +109,13 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { sinkContext = new Context (); sinkContext.put(FlumeConstants.CONFIG_TABLE, "flume_test"); sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl()); - sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name()); - sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2"); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.REGEX.name()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, "col1,col2"); sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name()); sink = new PhoenixSink(); Configurables.configure(sink, sinkContext); - + final Channel channel = this.initChannel(); sink.setChannel(channel); try { @@ -131,14 +142,14 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2"); sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name()); - + sink = new PhoenixSink(); Configurables.configure(sink, sinkContext); Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); - + final Channel channel = this.initChannel(); sink.setChannel(channel); - + sink.start(); Assert.assertEquals(LifecycleState.START, sink.getLifecycleState()); sink.stop(); @@ -160,13 +171,13 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl); sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$"); sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2"); - sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.TIMESTAMP.name()); + - sink = new PhoenixSink(); Configurables.configure(sink, sinkContext); Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); - + final Channel channel = this.initChannel(); sink.setChannel(channel); @@ -179,6 +190,66 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT { admin.close(); } } + + @Test + public void testExtendedSink() throws Exception { + // Create a mock NullPhoenixSink which extends PhoenixSink, and verify configure is invoked() + + PhoenixSink sink = mock(NullPhoenixSink.class); + sinkContext = new Context(); + sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED"); + sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, CustomSerializer.class.getName()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS"); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.TIMESTAMP.name()); + + Configurables.configure(sink, sinkContext); + verify(sink).configure(sinkContext); + } + + @Test + public void testExtendedSerializer() throws Exception { + /* + Sadly, we can't mock a serializer, as the PhoenixSink does a Class.forName() to instantiate + it. Instead. we'll setup a Flume channel and verify the data our custom serializer wrote. + */ + + final String fullTableName = "FLUME_TEST_EXTENDED"; + final String ddl = "CREATE TABLE " + fullTableName + " (ID BIGINT NOT NULL PRIMARY KEY, COUNTS UNSIGNED_LONG)"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute(ddl); + conn.commit(); + + sinkContext = new Context(); + sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED"); + sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, CustomSerializer.class.getName()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS"); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.TIMESTAMP.name()); + + PhoenixSink sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + // Send a test event through Flume, using our custom serializer + final Channel channel = this.initChannel(); + sink.setChannel(channel); + sink.start(); + + final Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(EventBuilder.withBody(Bytes.toBytes("test event"))); + transaction.commit(); + transaction.close(); + + sink.process(); + sink.stop(); + + // Verify our serializer wrote out data + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM FLUME_TEST_EXTENDED"); + assertTrue(rs.next()); + assertTrue(rs.getLong(1) == 1L); + } private Channel initChannel() { //Channel configuration http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java new file mode 100644 index 0000000..a0785ae --- /dev/null +++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume.serializer; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.List; + +public class CustomSerializer extends BaseEventSerializer { + private static final Logger logger = LoggerFactory.getLogger(CustomSerializer.class); + @Override + public void doConfigure(Context context) { + + } + + @Override + public void doInitialize() throws SQLException { + + } + + @Override + public void upsertEvents(List<Event> events) throws SQLException { + // Just execute a sample UPSERT + connection.createStatement().execute("UPSERT INTO FLUME_TEST_EXTENDED(ID, COUNTS) VALUES(1, 1)"); + connection.commit(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java new file mode 100644 index 0000000..1df52e1 --- /dev/null +++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume.sink; + +public class NullPhoenixSink extends PhoenixSink { +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java index f9c929d..2b102a2 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java @@ -42,7 +42,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -public final class PhoenixSink extends AbstractSink implements Configurable { +public class PhoenixSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(PhoenixSink.class); private static AtomicInteger counter = new AtomicInteger(); private static final String NAME = "Phoenix Sink__"; @@ -70,14 +70,13 @@ public final class PhoenixSink extends AbstractSink implements Configurable { * @param eventSerializerType */ private void initializeSerializer(final Context context,final String eventSerializerType) { - - EventSerializers eventSerializer = null; - try { - eventSerializer = EventSerializers.valueOf(eventSerializerType.toUpperCase()); + String serializerClazz = null; + EventSerializers eventSerializer = null; + + try { + eventSerializer = EventSerializers.valueOf(eventSerializerType.toUpperCase()); } catch(IllegalArgumentException iae) { - logger.error("An invalid eventSerializer {} was passed. Please specify one of {} ",eventSerializerType, - Joiner.on(",").skipNulls().join(EventSerializers.values())); - Throwables.propagate(iae); + serializerClazz = eventSerializerType; } final Context serializerContext = new Context(); @@ -86,7 +85,14 @@ public final class PhoenixSink extends AbstractSink implements Configurable { try { @SuppressWarnings("unchecked") - Class<? extends EventSerializer> clazz = (Class<? extends EventSerializer>) Class.forName(eventSerializer.getClassName()); + Class<? extends EventSerializer> clazz = null; + if(serializerClazz == null) { + clazz = (Class<? extends EventSerializer>) Class.forName(eventSerializer.getClassName()); + } + else { + clazz = (Class<? extends EventSerializer>) Class.forName(serializerClazz); + } + serializer = clazz.newInstance(); serializer.configure(serializerContext);