API and style improvements to the Kudu Flume Sink This patch cleans up the Flume integration code a bit. Specifically: 1. s/EventProducer/OperationsProducer/ since the "KuduEventProducer" consumed Flume Events and produced Kudu Operations. 2. The KuduOperationsProducer API was changed. Now the initialize method takes just a KuduTable, and should be called once to initialize the KuduOperationsProducer after it is configured. The getOperations method now takes the Event instead. 3. The close method for a KuduOperationsProducer is now called when the KuduSink is called. Previously this was implied so in a comment but was not true. 4. General clean-up and style improvements.
Change-Id: I357df8cac7daa6ce105f9568cc3af09697032eb6 Reviewed-on: http://gerrit.cloudera.org:8080/4320 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fdfcdba2 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fdfcdba2 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fdfcdba2 Branch: refs/heads/master Commit: fdfcdba21f1c70b882d763e47a4d7d028bf6e2ea Parents: d807034 Author: Will Berkeley <[email protected]> Authored: Tue Sep 6 13:14:50 2016 -0400 Committer: Mike Percy <[email protected]> Committed: Thu Sep 8 19:20:58 2016 +0000 ---------------------------------------------------------------------- docs/release_notes.adoc | 11 + .../kudu/flume/sink/AvroKuduEventProducer.java | 284 ------------------- .../flume/sink/AvroKuduOperationsProducer.java | 279 ++++++++++++++++++ .../kudu/flume/sink/KuduEventProducer.java | 59 ---- .../kudu/flume/sink/KuduOperationsProducer.java | 56 ++++ .../org/apache/kudu/flume/sink/KuduSink.java | 113 ++++---- .../sink/KuduSinkConfigurationConstants.java | 20 +- .../sink/SimpleKeyedKuduEventProducer.java | 96 ------- .../sink/SimpleKeyedKuduOperationsProducer.java | 133 +++++++++ .../flume/sink/SimpleKuduEventProducer.java | 84 ------ .../sink/SimpleKuduOperationsProducer.java | 90 ++++++ .../src/test/avro/testAvroEventProducer.avsc | 11 - .../avro/testAvroKuduOperationsProducer.avsc | 11 + .../flume/sink/AvroKuduEventProducerTest.java | 215 -------------- .../sink/AvroKuduOperationsProducerTest.java | 209 ++++++++++++++ .../flume/sink/KeyedKuduEventProducerTest.java | 218 -------------- .../sink/KeyedKuduOperationsProducerTest.java | 227 +++++++++++++++ 17 files changed, 1085 insertions(+), 1031 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/docs/release_notes.adoc ---------------------------------------------------------------------- diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc index be89002..fc947b4 100644 --- a/docs/release_notes.adoc +++ b/docs/release_notes.adoc @@ -73,6 +73,17 @@ Kudu 1.0.0 are not supported. are not recommended and may be removed or modified with no deprecation period and without notice in future Kudu releases. +- The KuduEventProducer interface used to process Flume events into Kudu operations + for the Kudu Flume Sink has changed, and has been renamed KuduOperationsProducer. + The existing KuduEventProducers have been updated for the new interface, and have + been renamed similarly. + +[[rn_1.0.0_new_features]] +=== New features + +- The Kudu Flume Sink now supports processing events containing Avro-encoded + records, using the new AvroKuduOperationsProducer. + [[rn_0.10.0]] == Release notes specific to 0.10.0 http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java deleted file mode 100644 index f9ee683..0000000 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kudu.flume.sink; - -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.PartialRow; - -/** - * <p>An Avro serializer that generates one insert or operation per event by deserializing the event - * body as an Avro record and mapping its fields to columns in a Kudu table. - * <p><strong>Avro Kudu Event Producer configuration parameters</strong> - * <table cellpadding=3 cellspacing=0 border=1> - * <tr><th>Property Name</th> - * <th>Default</th> - * <th>Required?</th> - * <th>Description</th></tr> - * <tr><td>producer.operation</td> - * <td>operation</td> - * <td>No</td> - * <td>The operation used to write events to Kudu.</td> - * </tr> - * <tr><td>producer.schemaPath</td> - * <td></td> - * <td>No</td> - * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies. - * It's used whenever an event does not include its own schema. If not specified, the - * schema must be specified on a per-event basis, either by url or as a literal. - * Schemas must be a record type. - * </td> - * </tr> - * </table> - */ -public class AvroKuduEventProducer implements KuduEventProducer { - public static final String OPERATION_PROP = "operation"; - public static final String SCHEMA_PROP = "schemaPath"; - public static final String DEFAULT_OPERATION = "upsert"; - public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url"; - public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal"; - - private String operation; - private DatumReader<GenericRecord> reader; - private GenericRecord reuse; - private KuduTable table; - private byte[] payload; - private String defaultSchemaURL; - - /** - * The binary decoder to reuse for event parsing. - */ - private BinaryDecoder decoder = null; - - public AvroKuduEventProducer() { - } - - private static final Configuration conf = new Configuration(); - - /** - * A cache of schemas retrieved by URL to avoid re-parsing the schema. - */ - private static final LoadingCache<String, Schema> schemasFromURL = - CacheBuilder.newBuilder() - .build(new CacheLoader<String, Schema>() { - @Override - public Schema load(String url) throws IOException { - Schema.Parser parser = new Schema.Parser(); - InputStream is = null; - try { - FileSystem fs = FileSystem.get(URI.create(url), conf); - if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) { - is = fs.open(new Path(url)); - } else { - is = new URL(url).openStream(); - } - return parser.parse(is); - } finally { - if (is != null) { - is.close(); - } - } - } - }); - - /** - * A cache of literal schemas to avoid re-parsing the schema. - */ - private static final LoadingCache<String, Schema> schemasFromLiteral = - CacheBuilder.newBuilder() - .build(new CacheLoader<String, Schema>() { - @Override - public Schema load(String literal) { - Preconditions.checkNotNull(literal, - "Schema literal cannot be null without a Schema URL"); - return new Schema.Parser().parse(literal); - } - }); - - /** - * A cache of DatumReaders per schema. - */ - private final LoadingCache<Schema, DatumReader<GenericRecord>> readers = - CacheBuilder.newBuilder() - .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { - @Override - public DatumReader<GenericRecord> load(Schema schema) { - return new GenericDatumReader<>(schema); - } - }); - - @Override - public void configure(Context context) { - this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION); - - String schemaPath = context.getString(SCHEMA_PROP); - if (schemaPath != null) { - defaultSchemaURL = schemaPath; - } - } - - @Override - public void configure(ComponentConfiguration conf) { - } - - // NB: this is called once per event - @Override - public void initialize(Event event, KuduTable table) { - this.payload = event.getBody(); - this.table = table; - this.reader = readers.getUnchecked(getSchema(event)); - } - - @Override - public List<Operation> getOperations() throws FlumeException { - decoder = DecoderFactory.get().binaryDecoder(payload, decoder); - try { - reuse = reader.read(reuse, decoder); - } catch (IOException e) { - throw new FlumeException("Cannot deserialize event", e); - } - Operation op; - switch (operation.toLowerCase()) { - case "upsert": - op = table.newUpsert(); - break; - case "insert": - op = table.newInsert(); - break; - default: - throw new FlumeException(String.format("Unexpected operation %s", operation)); - } - setupOp(op, reuse); - return Collections.singletonList(op); - } - - private void setupOp(Operation op, GenericRecord record) { - PartialRow row = op.getRow(); - for (ColumnSchema col : table.getSchema().getColumns()) { - String name = col.getName(); - Object value = record.get(name); - if (value == null) { - if (col.isNullable()) { - row.setNull(name); - } else { - // leave unset for possible Kudu default - } - } else { - // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as - // a larger type. - try { - switch (col.getType()) { - case BOOL: - row.addBoolean(name, (boolean) value); - break; - case INT8: - row.addByte(name, (byte) value); - break; - case INT16: - row.addShort(name, (short) value); - break; - case INT32: - row.addInt(name, (int) value); - break; - case INT64: // Fall through - case TIMESTAMP: - row.addLong(name, (long) value); - break; - case FLOAT: - row.addFloat(name, (float) value); - break; - case DOUBLE: - row.addDouble(name, (double) value); - break; - case STRING: - row.addString(name, value.toString()); - break; - case BINARY: - row.addBinary(name, (byte[]) value); - break; - default: - throw new FlumeException(String.format( - "Unrecognized type %s for column %s", col.getType().toString(), name)); - } - } catch (ClassCastException e) { - throw new FlumeException(String.format("Failed to coerce value for column %s to type %s", - col.getName(), - col.getType())); - } - } - } - } - - private Schema getSchema(Event event) throws FlumeException { - Map<String, String> headers = event.getHeaders(); - String schemaURL = headers.get(SCHEMA_URL_HEADER); - String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER); - try { - if (schemaURL != null) { - return schemasFromURL.get(schemaURL); - } else if (schemaLiteral != null) { - return schemasFromLiteral.get(schemaLiteral); - } else if (defaultSchemaURL != null) { - return schemasFromURL.get(defaultSchemaURL); - } else { - throw new FlumeException( - String.format("No schema for event! Specify configuration property %s or event header %s", - SCHEMA_PROP, - SCHEMA_URL_HEADER)); - } - } catch (ExecutionException ex) { - throw new FlumeException("Cannot get schema", ex); - } catch (UncheckedExecutionException ex) { - throw new FlumeException("Cannot parse schema", ex); - } - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java new file mode 100644 index 0000000..f799162 --- /dev/null +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.flume.sink; + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +/** + * <p>An Avro serializer that generates one operation per event by deserializing the event + * body as an Avro record and mapping its fields to columns in a Kudu table. + * <p><strong>Avro Kudu Operations Producer configuration parameters</strong> + * <table cellpadding=3 cellspacing=0 border=1> + * <tr><th>Property Name</th> + * <th>Default</th> + * <th>Required?</th> + * <th>Description</th></tr> + * <tr> + * <td>producer.operation</td> + * <td>upsert</td> + * <td>No</td> + * <td>The operation used to write events to Kudu. + * Supported operations are 'insert' and 'upsert'</td> + * </tr> + * <tr> + * <td>producer.schemaPath</td> + * <td></td> + * <td>No</td> + * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies. + * It's used whenever an event does not include its own schema. If not specified, the + * schema must be specified on a per-event basis, either by url or as a literal. + * Schemas must be record type.</td> + * </tr> + * </table> + */ +public class AvroKuduOperationsProducer implements KuduOperationsProducer { + public static final String OPERATION_PROP = "operation"; + public static final String SCHEMA_PROP = "schemaPath"; + public static final String DEFAULT_OPERATION = "upsert"; + public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url"; + public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal"; + + private String operation; + private GenericRecord reuse; + private KuduTable table; + private String defaultSchemaURL; + + /** + * The binary decoder to reuse for event parsing. + */ + private BinaryDecoder decoder = null; + + /** + * A cache of schemas retrieved by URL to avoid re-parsing the schema. + */ + private static final LoadingCache<String, Schema> schemasFromURL = + CacheBuilder.newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String url) throws IOException { + Schema.Parser parser = new Schema.Parser(); + InputStream is = null; + try { + FileSystem fs = FileSystem.get(URI.create(url), conf); + if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) { + is = fs.open(new Path(url)); + } else { + is = new URL(url).openStream(); + } + return parser.parse(is); + } finally { + if (is != null) { + is.close(); + } + } + } + }); + + /** + * A cache of literal schemas to avoid re-parsing the schema. + */ + private static final LoadingCache<String, Schema> schemasFromLiteral = + CacheBuilder.newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String literal) { + Preconditions.checkNotNull(literal, + "Schema literal cannot be null without a Schema URL"); + return new Schema.Parser().parse(literal); + } + }); + + /** + * A cache of DatumReaders per schema. + */ + private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers = + CacheBuilder.newBuilder() + .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() { + @Override + public DatumReader<GenericRecord> load(Schema schema) { + return new GenericDatumReader<>(schema); + } + }); + + private static final Configuration conf = new Configuration(); + + public AvroKuduOperationsProducer() { + } + + @Override + public void configure(Context context) { + this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION); + + String schemaPath = context.getString(SCHEMA_PROP); + if (schemaPath != null) { + defaultSchemaURL = schemaPath; + } + } + + @Override + public void initialize(KuduTable table) { + this.table = table; + } + + @Override + public List<Operation> getOperations(Event event) throws FlumeException { + DatumReader<GenericRecord> reader = readers.getUnchecked(getSchema(event)); + decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); + try { + reuse = reader.read(reuse, decoder); + } catch (IOException e) { + throw new FlumeException("Cannot deserialize event", e); + } + Operation op; + switch (operation.toLowerCase()) { + case "upsert": + op = table.newUpsert(); + break; + case "insert": + op = table.newInsert(); + break; + default: + throw new FlumeException(String.format("Unexpected operation %s", operation)); + } + setupOp(op, reuse); + return Collections.singletonList(op); + } + + private void setupOp(Operation op, GenericRecord record) { + PartialRow row = op.getRow(); + for (ColumnSchema col : table.getSchema().getColumns()) { + String name = col.getName(); + Object value = record.get(name); + if (value == null) { + if (col.isNullable()) { + row.setNull(name); + } else { + // leave unset for possible Kudu default + } + } else { + // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as + // a larger type. + try { + switch (col.getType()) { + case BOOL: + row.addBoolean(name, (boolean) value); + break; + case INT8: + row.addByte(name, (byte) value); + break; + case INT16: + row.addShort(name, (short) value); + break; + case INT32: + row.addInt(name, (int) value); + break; + case INT64: // Fall through + case TIMESTAMP: + row.addLong(name, (long) value); + break; + case FLOAT: + row.addFloat(name, (float) value); + break; + case DOUBLE: + row.addDouble(name, (double) value); + break; + case STRING: + row.addString(name, value.toString()); + break; + case BINARY: + row.addBinary(name, (byte[]) value); + break; + default: + throw new FlumeException(String.format( + "Unrecognized type %s for column %s", col.getType().toString(), name)); + } + } catch (ClassCastException e) { + throw new FlumeException( + String.format("Failed to coerce value for column '%s' to type %s", + col.getName(), + col.getType())); + } + } + } + } + + private Schema getSchema(Event event) throws FlumeException { + Map<String, String> headers = event.getHeaders(); + String schemaURL = headers.get(SCHEMA_URL_HEADER); + String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER); + try { + if (schemaURL != null) { + return schemasFromURL.get(schemaURL); + } else if (schemaLiteral != null) { + return schemasFromLiteral.get(schemaLiteral); + } else if (defaultSchemaURL != null) { + return schemasFromURL.get(defaultSchemaURL); + } else { + throw new FlumeException( + String.format("No schema for event. " + + "Specify configuration property '%s' or event header '%s'", + SCHEMA_PROP, + SCHEMA_URL_HEADER)); + } + } catch (ExecutionException ex) { + throw new FlumeException("Cannot get schema", ex); + } catch (UncheckedExecutionException ex) { + throw new FlumeException("Cannot parse schema", ex); + } + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java deleted file mode 100644 index 95f63b1..0000000 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.kudu.flume.sink; - -import org.apache.flume.Event; -import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurableComponent; -import org.apache.kudu.annotations.InterfaceAudience; -import org.apache.kudu.annotations.InterfaceStability; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; - -import java.util.List; - -/** - * Interface for an event producer which produces Kudu Operations to write - * the headers and body of an event in a Kudu table. This is configurable, - * so any config params required should be taken through this. The columns - * should exist in the table specified in the configuration for the KuduSink. - */ [email protected] [email protected] -public interface KuduEventProducer extends Configurable, ConfigurableComponent { - /** - * Initialize the event producer. - * @param event to be written to Kudu - * @param table the KuduTable object used for creating Kudu Operation objects - */ - void initialize(Event event, KuduTable table); - - /** - * Get the operations that should be written out to Kudu as a result of this - * event. This list is written to Kudu using the Kudu client API. - * @return List of {@link org.apache.kudu.client.Operation} which - * are written as such to Kudu - */ - List<Operation> getOperations(); - - /* - * Clean up any state. This will be called when the sink is being stopped. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java new file mode 100644 index 0000000..8816e95 --- /dev/null +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kudu.flume.sink; + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.kudu.annotations.InterfaceAudience; +import org.apache.kudu.annotations.InterfaceStability; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; + +import java.util.List; + +/** + * Interface for an operations producer that produces Kudu Operations from + * Flume events. + */ [email protected] [email protected] +public interface KuduOperationsProducer extends Configurable, AutoCloseable { + /** + * Initializes the operations producer. Called between configure and + * getOperations. + * @param table the KuduTable used to create Kudu Operation objects + */ + void initialize(KuduTable table); + + /** + * Returns the operations that should be written to Kudu as a result of this + * event. + * @return List of {@link org.apache.kudu.client.Operation} that + * should be written to Kudu + */ + List<Operation> getOperations(Event event); + + /** + * Cleans up any state. Called when the sink is stopped. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java index 8c206d8..b80462c 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java @@ -18,6 +18,13 @@ */ package org.apache.kudu.flume.sink; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -53,13 +60,13 @@ import java.util.List; * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr> * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read from.</td></tr> * <tr><td>type</td><td></td><td>Yes</td><td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr> - * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" pairs of the Kudu master servers. The port is optional.</td></tr> + * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" Kudu master addresses. The port is optional.</td></tr> * <tr><td>tableName</td><td></td><td>Yes</td><td>The name of the Kudu table to write to.</td></tr> * <tr><td>batchSize</td><td>100</td><td>No</td><td>The maximum number of events the sink will attempt to take from the channel per transaction.</td></tr> - * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.</td></tr> + * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr> * <tr><td>timeoutMillis</td><td>10000</td><td>No</td><td>Timeout period for Kudu write operations, in milliseconds.</td></tr> - * <tr><td>producer</td><td>{@link org.apache.kudu.flume.sink.SimpleKuduEventProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduEventProducer} the sink should use.</td></tr> - * <tr><td>producer.*</td><td></td><td>(Varies by event producer)</td><td>Configuration properties to pass to the event producer implementation.</td></tr> + * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduOperationsProducer} the sink should use.</td></tr> + * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td><td>Configuration properties to pass to the operations producer implementation.</td></tr> * </table> * * <p><strong>Installation</strong> @@ -78,8 +85,8 @@ public class KuduSink extends AbstractSink implements Configurable { private static final Long DEFAULT_BATCH_SIZE = 100L; private static final Long DEFAULT_TIMEOUT_MILLIS = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS; - private static final String DEFAULT_KUDU_EVENT_PRODUCER = - "org.apache.kudu.flume.sink.SimpleKuduEventProducer"; + private static final String DEFAULT_KUDU_OPERATION_PRODUCER = + "org.apache.kudu.flume.sink.SimpleKuduOperationsProducer"; private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true; private String masterAddresses; @@ -90,9 +97,7 @@ public class KuduSink extends AbstractSink implements Configurable { private KuduTable table; private KuduSession session; private KuduClient client; - private KuduEventProducer eventProducer; - private String eventProducerType; - private Context producerContext; + private KuduOperationsProducer operationsProducer; private SinkCounter sinkCounter; public KuduSink() { @@ -107,10 +112,10 @@ public class KuduSink extends AbstractSink implements Configurable { @Override public void start() { - Preconditions.checkState(table == null && session == null, "Please call stop " + - "before calling start on an old instance."); + Preconditions.checkState(table == null && session == null, + "Please call stop before calling start on an old instance."); - // This is not null only inside tests + // client is not null only inside tests if (client == null) { client = new KuduClient.KuduClientBuilder(masterAddresses).build(); } @@ -123,10 +128,11 @@ public class KuduSink extends AbstractSink implements Configurable { table = client.openTable(tableName); } catch (Exception e) { sinkCounter.incrementConnectionFailedCount(); - String msg = String.format("Could not open table '%s' from Kudu", tableName); + String msg = String.format("Could not open Kudu table '%s'", tableName); logger.error(msg, e); throw new FlumeException(msg, e); } + operationsProducer.initialize(table); super.start(); sinkCounter.incrementConnectionCreatedCount(); @@ -135,6 +141,13 @@ public class KuduSink extends AbstractSink implements Configurable { @Override public void stop() { + Exception ex = null; + try { + operationsProducer.close(); + } catch (Exception e) { + ex = e; + logger.error("Error closing operations producer", e); + } try { if (client != null) { client.shutdown(); @@ -143,53 +156,52 @@ public class KuduSink extends AbstractSink implements Configurable { table = null; session = null; } catch (Exception e) { - throw new FlumeException("Error closing client.", e); + ex = e; + logger.error("Error closing client", e); } sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); + if (ex != null) { + throw new FlumeException("Error stopping sink", ex); + } } @SuppressWarnings("unchecked") @Override public void configure(Context context) { - masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES); - tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME); - - batchSize = context.getLong( - KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE); - timeoutMillis = context.getLong( - KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS); - ignoreDuplicateRows = context.getBoolean( - KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS); - eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER); - + masterAddresses = context.getString(MASTER_ADDRESSES); Preconditions.checkNotNull(masterAddresses, - "Master address cannot be empty, please specify '" + - KuduSinkConfigurationConstants.MASTER_ADDRESSES + - "' in configuration file"); + "Missing master addresses. Please specify property '$s'.", + MASTER_ADDRESSES); + + tableName = context.getString(TABLE_NAME); Preconditions.checkNotNull(tableName, - "Table name cannot be empty, please specify '" + - KuduSinkConfigurationConstants.TABLE_NAME + - "' in configuration file"); - - // Check for event producer, if null set event producer type. - if (eventProducerType == null || eventProducerType.isEmpty()) { - eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER; - logger.info("No Kudu event producer defined, will use default"); + "Missing table name. Please specify property '%s'", + TABLE_NAME); + + batchSize = context.getLong(BATCH_SIZE, DEFAULT_BATCH_SIZE); + timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS); + ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS); + String operationProducerType = context.getString(PRODUCER); + + // Check for operations producer, if null set default operations producer type. + if (operationProducerType == null || operationProducerType.isEmpty()) { + operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER; + logger.warn("No Kudu operations producer provided, using default"); } - producerContext = new Context(); + Context producerContext = new Context(); producerContext.putAll(context.getSubProperties( KuduSinkConfigurationConstants.PRODUCER_PREFIX)); try { - Class<? extends KuduEventProducer> clazz = - (Class<? extends KuduEventProducer>) - Class.forName(eventProducerType); - eventProducer = clazz.newInstance(); - eventProducer.configure(producerContext); + Class<? extends KuduOperationsProducer> clazz = + (Class<? extends KuduOperationsProducer>) + Class.forName(operationProducerType); + operationsProducer = clazz.newInstance(); + operationsProducer.configure(producerContext); } catch (Exception e) { - logger.error("Could not instantiate Kudu event producer." , e); + logger.error("Could not instantiate Kudu operations producer" , e); Throwables.propagate(e); } sinkCounter = new SinkCounter(this.getName()); @@ -202,9 +214,9 @@ public class KuduSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { if (session.hasPendingOperations()) { - // If for whatever reason we have pending operations then just refuse to process - // and tell caller to try again a bit later. We don't want to pile on the kudu - // session object. + // If for whatever reason we have pending operations, refuse to process + // more and tell the caller to try again a bit later. We don't want to + // pile on the KuduSession. return Status.BACKOFF; } @@ -221,8 +233,7 @@ public class KuduSink extends AbstractSink implements Configurable { break; } - eventProducer.initialize(event, table); - List<Operation> operations = eventProducer.getOperations(); + List<Operation> operations = operationsProducer.getOperations(event); for (Operation o : operations) { session.apply(o); } @@ -280,10 +291,4 @@ public class KuduSink extends AbstractSink implements Configurable { return Status.BACKOFF; } - - @VisibleForTesting - @InterfaceAudience.Private - KuduEventProducer getEventProducer() { - return eventProducer; - } } http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java index 7da815e..e5f7342 100644 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java @@ -22,15 +22,13 @@ package org.apache.kudu.flume.sink; import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.annotations.InterfaceStability; -/** - * Constants used for configuration of KuduSink - */ - @InterfaceAudience.Public @InterfaceStability.Evolving public class KuduSinkConfigurationConstants { /** - * Comma-separated list of "host:port" pairs of the masters (port optional). + * Comma-separated list of "host:port" Kudu master addresses. + * The port is optional and defaults to the Kudu Java client's default master + * port. */ public static final String MASTER_ADDRESSES = "masterAddresses"; @@ -40,18 +38,20 @@ public class KuduSinkConfigurationConstants { public static final String TABLE_NAME = "tableName"; /** - * The fully qualified class name of the Kudu event producer the sink should use. + * The fully qualified class name of the KuduOperationsProducer class that the + * sink should use. */ public static final String PRODUCER = "producer"; /** - * Configuration to pass to the Kudu event producer. + * Prefix for configuration parameters that are passed to the + * KuduOperationsProducer. */ public static final String PRODUCER_PREFIX = PRODUCER + "."; /** - * Maximum number of events the sink should take from the channel per - * transaction, if available. + * Maximum number of events that the sink should take from the channel per + * transaction. */ public static final String BATCH_SIZE = "batchSize"; @@ -61,7 +61,7 @@ public class KuduSinkConfigurationConstants { public static final String TIMEOUT_MILLIS = "timeoutMillis"; /** - * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu. + * Whether to ignore duplicate primary key errors caused by inserts. */ public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows"; } http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java deleted file mode 100644 index 534fd33..0000000 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kudu.flume.sink; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.kudu.client.Insert; -import org.apache.kudu.client.Upsert; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.PartialRow; - -import java.util.Collections; -import java.util.List; - -/** - * <p>A simple serializer that generates one {@link Insert} or {@link Upsert} per {@link Event} by writing the event - * body into a BINARY column. The pair (key column name, key column value) should be a header in the {@link Event}; - * the column name is configurable but the column type must be STRING. Multiple key columns are not supported.</p> - * - * <p><strong>Simple Keyed Kudu Event Producer configuration parameters</strong></p> - * - * <table cellpadding=3 cellspacing=0 border=1> - * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr> - * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume event body to.</td></tr> - * <tr><td>producer.keyColumn</td><td>key</td><td>No</td><td>The name of the STRING key column of the target Kudu table.</td></tr> - * <tr><td>producer.upsert</td><td>false</td><td>No</td><td>Whether to insert or upsert events.</td></tr> - * </table> - */ -public class SimpleKeyedKuduEventProducer implements KuduEventProducer { - private byte[] payload; - private String key; - private KuduTable table; - private String payloadColumn; - private String keyColumn; - private boolean upsert; - - public SimpleKeyedKuduEventProducer(){ - } - - @Override - public void configure(Context context) { - payloadColumn = context.getString("payloadColumn","payload"); - keyColumn = context.getString("keyColumn", "key"); - upsert = context.getBoolean("upsert", false); - } - - @Override - public void configure(ComponentConfiguration conf) { - } - - @Override - public void initialize(Event event, KuduTable table) { - this.payload = event.getBody(); - this.key = event.getHeaders().get(keyColumn); - this.table = table; - } - - @Override - public List<Operation> getOperations() throws FlumeException { - try { - Operation op = (upsert) ? table.newUpsert() : table.newInsert(); - PartialRow row = op.getRow(); - row.addString(keyColumn, key); - row.addBinary(payloadColumn, payload); - - return Collections.singletonList(op); - } catch (Exception e){ - throw new FlumeException("Failed to create Kudu Operation object!", e); - } - } - - @Override - public void close() { - } -} - http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java new file mode 100644 index 0000000..8360df8 --- /dev/null +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.flume.sink; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Upsert; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +import java.util.Collections; +import java.util.List; + +/** + * <p>A simple serializer that generates one {@link Insert} or {@link Upsert} + * per {@link Event} by writing the event body into a BINARY column. The pair + * (key column name, key column value) should be a header in the {@link Event}; + * the column name is configurable but the column type must be STRING. Multiple + * key columns are not supported. + * + * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong> + * + * <table cellpadding=3 cellspacing=0 border=1> + * <tr> + * <th>Property Name</th> + * <th>Default</th> + * <th>Required?</th> + * <th>Description</th> + * </tr> + * <tr> + * <td>producer.payloadColumn</td> + * <td>payload</td> + * <td>No</td> + * <td>The name of the BINARY column to write the Flume event body to.</td> + * </tr> + * <tr> + * <td>producer.keyColumn</td> + * <td>key</td> + * <td>No</td> + * <td>The name of the STRING key column of the target Kudu table.</td> + * </tr> + * <tr> + * <td>producer.operation</td> + * <td>upsert</td> + * <td>No</td> + * <td>The operation used to write events to Kudu. Supported operations + * are 'insert' and 'upsert'</td> + * </tr> + * </table> + */ +public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer { + public static final String PAYLOAD_COLUMN_PROP = "payloadColumn"; + public static final String PAYLOAD_COLUMN_DEFAULT = "payload"; + public static final String KEY_COLUMN_PROP = "keyColumn"; + public static final String KEY_COLUMN_DEFAULT = "key"; + public static final String OPERATION_PROP = "operation"; + public static final String OPERATION_DEFAULT = "upsert"; + + private KuduTable table; + private String payloadColumn; + private String keyColumn; + private String operation; + + public SimpleKeyedKuduOperationsProducer(){ + } + + @Override + public void configure(Context context) { + payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT); + keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT); + operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT); + } + + @Override + public void initialize(KuduTable table) { + this.table = table; + } + + @Override + public List<Operation> getOperations(Event event) throws FlumeException { + String key = event.getHeaders().get(keyColumn); + if (key == null) { + throw new FlumeException( + String.format("No value provided for key column %s", keyColumn)); + } + try { + Operation op; + switch (operation.toLowerCase()) { + case "upsert": + op = table.newUpsert(); + break; + case "insert": + op = table.newInsert(); + break; + default: + throw new FlumeException( + String.format("Unexpected operation %s", operation)); + } + PartialRow row = op.getRow(); + row.addString(keyColumn, key); + row.addBinary(payloadColumn, event.getBody()); + + return Collections.singletonList(op); + } catch (Exception e){ + throw new FlumeException("Failed to create Kudu Operation object", e); + } + } + + @Override + public void close() { + } +} + http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java deleted file mode 100644 index 2faf1a1..0000000 --- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kudu.flume.sink; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.kudu.client.Insert; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.PartialRow; - -import java.util.Collections; -import java.util.List; - -/** - * <p>A simple serializer that generates one {@link Insert} per {@link Event} by writing the event - * body into a BINARY column. The headers are discarded.</p> - * - * <p><strong>Simple Kudu Event Producer configuration parameters</strong></p> - * - * <table cellpadding=3 cellspacing=0 border=1> - * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr> - * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume the event body to.</td></tr> - * </table> - */ -public class SimpleKuduEventProducer implements KuduEventProducer { - private byte[] payload; - private KuduTable table; - private String payloadColumn; - - public SimpleKuduEventProducer(){ - } - - @Override - public void configure(Context context) { - payloadColumn = context.getString("payloadColumn","payload"); - } - - @Override - public void configure(ComponentConfiguration conf) { - } - - @Override - public void initialize(Event event, KuduTable table) { - this.payload = event.getBody(); - this.table = table; - } - - @Override - public List<Operation> getOperations() throws FlumeException { - try { - Insert insert = table.newInsert(); - PartialRow row = insert.getRow(); - row.addBinary(payloadColumn, payload); - - return Collections.singletonList((Operation) insert); - } catch (Exception e){ - throw new FlumeException("Failed to create Kudu Insert object!", e); - } - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java new file mode 100644 index 0000000..f5f5838 --- /dev/null +++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.flume.sink; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +import java.util.Collections; +import java.util.List; + +/** + * <p>A simple serializer that generates one {@link Insert} per {@link Event} + * by writing the event body into a BINARY column. The headers are discarded. + * + * <p><strong>Simple Kudu Event Producer configuration parameters</strong> + * + * <table cellpadding=3 cellspacing=0 border=1> + * <tr> + * <th>Property Name</th> + * <th>Default</th> + * <th>Required?</th> + * <th>Description</th> + * </tr> + * <tr> + * <td>producer.payloadColumn</td> + * <td>payload</td> + * <td>No</td> + * <td>The name of the BINARY column to write the Flume the event body to.</td> + * </tr> + * </table> + */ +public class SimpleKuduOperationsProducer implements KuduOperationsProducer { + public static final String PAYLOAD_COLUMN_PROP = "payloadColumn"; + public static final String PAYLOAD_COLUMN_DEFAULT = "payload"; + + private KuduTable table; + private String payloadColumn; + + public SimpleKuduOperationsProducer(){ + } + + @Override + public void configure(Context context) { + payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT); + } + + @Override + public void initialize(KuduTable table) { + this.table = table; + } + + @Override + public List<Operation> getOperations(Event event) throws FlumeException { + try { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addBinary(payloadColumn, event.getBody()); + + return Collections.singletonList((Operation) insert); + } catch (Exception e){ + throw new FlumeException("Failed to create Kudu Insert object", e); + } + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc deleted file mode 100644 index d86955c..0000000 --- a/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc +++ /dev/null @@ -1,11 +0,0 @@ -{"namespace": "org.apache.kudu.flume.sink", - "type": "record", - "name": "AvroKuduEventProducerTestRecord", - "fields": [ - {"name": "key", "type": "int"}, - {"name": "longField", "type": "long"}, - {"name": "doubleField", "type": "double"}, - {"name": "nullableField", "type": ["string", "null"]}, - {"name": "stringField", "type": "string"} - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc new file mode 100644 index 0000000..b562c3a --- /dev/null +++ b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc @@ -0,0 +1,11 @@ +{"namespace": "org.apache.kudu.flume.sink", + "type": "record", + "name": "AvroKuduOperationsProducerTestRecord", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "longField", "type": "long"}, + {"name": "doubleField", "type": "double"}, + {"name": "nullableField", "type": ["string", "null"]}, + {"name": "stringField", "type": "string"} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java deleted file mode 100644 index c1de448..0000000 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kudu.flume.sink; - -import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_LITERAL_HEADER; -import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_PROP; -import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_URL_HEADER; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX; -import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.io.Files; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.Sink; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; -import org.apache.kudu.client.CreateTableOptions; -import org.apache.kudu.client.KuduTable; -import org.junit.BeforeClass; -import org.junit.Test; - -public class AvroKuduEventProducerTest extends BaseKuduTest { - private static final String schemaPath = "src/test/avro/testAvroEventProducer.avsc"; - private static String schemaLiteral; - - private static org.apache.avro.Schema schema; - - enum SchemaLocation { - GLOBAL, URL, LITERAL - } - - @BeforeClass - public static void setupAvroSchemaBeforeClass() { - org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); - try { - schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8); - schema = parser.parse(new File(schemaPath)); - } catch (IOException e) { - throw new FlumeException("Unable to open and parse schema file!", e); - } - } - - @Test - public void testEmptyChannel() throws Exception { - testEvents(0, SchemaLocation.GLOBAL); - } - - @Test - public void testOneEvent() throws Exception { - testEvents(1, SchemaLocation.GLOBAL); - } - - @Test - public void testThreeEvents() throws Exception { - testEvents(3, SchemaLocation.GLOBAL); - } - - @Test - public void testThreeEventsSchemaURLInEvent() throws Exception { - testEvents(3, SchemaLocation.URL); - } - - @Test - public void testThreeEventsSchemaLiteralInEvent() throws Exception { - testEvents(3, SchemaLocation.LITERAL); - } - - private void testEvents(int eventCount, SchemaLocation schemaLocation) - throws Exception { - KuduTable table = createNewTable( - String.format("test%sevents%s", eventCount, schemaLocation)); - String tableName = table.getName(); - String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString(); - Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context() - : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaURI)); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - - Transaction tx = channel.getTransaction(); - tx.begin(); - writeEventsToChannel(channel, eventCount, schemaLocation); - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF); - } else { - assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY); - } - - List<String> answers = makeAnswers(eventCount); - List<String> rows = scanTableToStrings(table); - assertEquals("wrong number of rows inserted", answers.size(), rows.size()); - assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray()); - } - - private KuduTable createNewTable(String tableName) throws Exception { - ArrayList<ColumnSchema> columns = new ArrayList<>(5); - columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build()); - CreateTableOptions createOptions = - new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")) - .setNumReplicas(1); - return createTable(tableName, new Schema(columns), createOptions); - } - - private KuduSink createSink(String tableName, Context ctx) { - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(TABLE_NAME, tableName); - parameters.put(MASTER_ADDRESSES, getMasterAddresses()); - parameters.put(PRODUCER, - "org.apache.kudu.flume.sink.AvroKuduEventProducer"); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - - return sink; - } - - private void writeEventsToChannel(Channel channel, int eventCount, - SchemaLocation schemaLocation) throws Exception { - for (int i = 0; i < eventCount; i++) { - AvroKuduEventProducerTestRecord record = new AvroKuduEventProducerTestRecord(); - record.setKey(10 * i); - record.setLongField(2L * i); - record.setDoubleField(2.71828 * i); - record.setNullableField(i % 2 == 0 ? null : "taco"); - record.setStringField(String.format("hello %d", i)); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); - DatumWriter<AvroKuduEventProducerTestRecord> writer = - new SpecificDatumWriter<>(AvroKuduEventProducerTestRecord.class); - writer.write(record, encoder); - encoder.flush(); - Event e = EventBuilder.withBody(out.toByteArray()); - if (schemaLocation == SchemaLocation.URL) { - String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString(); - e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaURI)); - } else if (schemaLocation == SchemaLocation.LITERAL) { - e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral)); - } - channel.put(e); - } - } - - private List<String> makeAnswers(int eventCount) { - List<String> answers = Lists.newArrayList(); - for (int i = 0; i < eventCount; i++) { - answers.add(String.format( - "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " + - "STRING nullableField=%s, STRING stringField=hello %s", - 10 * i, - 2 * i, - 2.71828 * i, - i % 2 == 0 ? "NULL" : "taco", - i)); - } - Collections.sort(answers); - return answers; - } -} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java new file mode 100644 index 0000000..2465428 --- /dev/null +++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.flume.sink; + +import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER; +import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP; +import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX; +import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.BaseKuduTest; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduTable; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AvroKuduOperationsProducerTest extends BaseKuduTest { + private static final String schemaPath = "src/test/avro/testAvroKuduOperationsProducer.avsc"; + private static String schemaLiteral; + + enum SchemaLocation { + GLOBAL, URL, LITERAL + } + + @BeforeClass + public static void setupAvroSchemaBeforeClass() { + try { + schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8); + } catch (IOException e) { + throw new FlumeException("Unable to read schema file!", e); + } + } + + @Test + public void testEmptyChannel() throws Exception { + testEvents(0, SchemaLocation.GLOBAL); + } + + @Test + public void testOneEvent() throws Exception { + testEvents(1, SchemaLocation.GLOBAL); + } + + @Test + public void testThreeEvents() throws Exception { + testEvents(3, SchemaLocation.GLOBAL); + } + + @Test + public void testThreeEventsSchemaURLInEvent() throws Exception { + testEvents(3, SchemaLocation.URL); + } + + @Test + public void testThreeEventsSchemaLiteralInEvent() throws Exception { + testEvents(3, SchemaLocation.LITERAL); + } + + private void testEvents(int eventCount, SchemaLocation schemaLocation) + throws Exception { + KuduTable table = createNewTable( + String.format("test%sevents%s", eventCount, schemaLocation)); + String tableName = table.getName(); + String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString(); + Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context() + : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaURI)); + KuduSink sink = createSink(tableName, ctx); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + + Transaction tx = channel.getTransaction(); + tx.begin(); + writeEventsToChannel(channel, eventCount, schemaLocation); + tx.commit(); + tx.close(); + + Sink.Status status = sink.process(); + if (eventCount == 0) { + assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF); + } else { + assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY); + } + + List<String> answers = makeAnswers(eventCount); + List<String> rows = scanTableToStrings(table); + assertEquals("wrong number of rows inserted", answers.size(), rows.size()); + assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray()); + } + + private KuduTable createNewTable(String tableName) throws Exception { + ArrayList<ColumnSchema> columns = new ArrayList<>(5); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build()); + CreateTableOptions createOptions = + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")) + .setNumReplicas(1); + return createTable(tableName, new Schema(columns), createOptions); + } + + private KuduSink createSink(String tableName, Context ctx) { + KuduSink sink = new KuduSink(syncClient); + HashMap<String, String> parameters = new HashMap<>(); + parameters.put(TABLE_NAME, tableName); + parameters.put(MASTER_ADDRESSES, getMasterAddresses()); + parameters.put(PRODUCER, AvroKuduOperationsProducer.class.getName()); + Context context = new Context(parameters); + context.putAll(ctx.getParameters()); + Configurables.configure(sink, context); + + return sink; + } + + private void writeEventsToChannel(Channel channel, int eventCount, + SchemaLocation schemaLocation) throws Exception { + for (int i = 0; i < eventCount; i++) { + AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord(); + record.setKey(10 * i); + record.setLongField(2L * i); + record.setDoubleField(2.71828 * i); + record.setNullableField(i % 2 == 0 ? null : "taco"); + record.setStringField(String.format("hello %d", i)); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + DatumWriter<AvroKuduOperationsProducerTestRecord> writer = + new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class); + writer.write(record, encoder); + encoder.flush(); + Event e = EventBuilder.withBody(out.toByteArray()); + if (schemaLocation == SchemaLocation.URL) { + String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString(); + e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaURI)); + } else if (schemaLocation == SchemaLocation.LITERAL) { + e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral)); + } + channel.put(e); + } + } + + private List<String> makeAnswers(int eventCount) { + List<String> answers = Lists.newArrayList(); + for (int i = 0; i < eventCount; i++) { + answers.add(String.format( + "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " + + "STRING nullableField=%s, STRING stringField=hello %s", + 10 * i, + 2 * i, + 2.71828 * i, + i % 2 == 0 ? "NULL" : "taco", + i)); + } + Collections.sort(answers); + return answers; + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java deleted file mode 100644 index bf56ca3..0000000 --- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.kudu.flume.sink; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Sink; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.BaseKuduTest; -import org.apache.kudu.client.CreateTableOptions; -import org.apache.kudu.client.KuduTable; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class KeyedKuduEventProducerTest extends BaseKuduTest { - private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduEventProducerTest.class); - - private KuduTable createNewTable(String tableName) throws Exception { - LOG.info("Creating new table..."); - - ArrayList<ColumnSchema> columns = new ArrayList<>(2); - columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build()); - columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(false).build()); - CreateTableOptions createOptions = - new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")) - .setNumReplicas(1); - KuduTable table = createTable(tableName, new Schema(columns), createOptions); - - LOG.info("Created new table."); - - return table; - } - - @Test - public void testEmptyChannelWithInsert() throws Exception { - testEvents(0, "false"); - } - - @Test - public void testOneEventWithInsert() throws Exception { - testEvents(1, "false"); - } - - @Test - public void testThreeEventsWithInsert() throws Exception { - testEvents(3, "false"); - } - - @Test - public void testEmptyChannelWithUpsert() throws Exception { - testEvents(0, "true"); - } - - @Test - public void testOneEventWithUpsert() throws Exception { - testEvents(1, "true"); - } - - @Test - public void testThreeEventsWithUpsert() throws Exception { - testEvents(3, "true"); - } - - @Test - public void testDuplicateRowsWithUpsert() throws Exception { - LOG.info("Testing events with upsert..."); - - KuduTable table = createNewTable("testDupUpsertEvents"); - String tableName = table.getName(); - Context ctx = new Context(ImmutableMap.of("producer.upsert", "true")); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - - Transaction tx = channel.getTransaction(); - tx.begin(); - - int numRows = 3; - for (int i = 0; i < numRows; i++) { - Event e = EventBuilder.withBody(String.format("payload body %s", i), Charsets.UTF_8); - e.setHeaders(ImmutableMap.of("key", String.format("key %s", i))); - channel.put(e); - } - - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); - - List<String> rows = scanTableToStrings(table); - assertEquals(numRows + " row(s) expected", numRows, rows.size()); - - for (int i = 0; i < numRows; i++) { - assertTrue("incorrect payload", rows.get(i).contains("payload body " + i)); - } - - Transaction utx = channel.getTransaction(); - utx.begin(); - - Event dup = EventBuilder.withBody("payload body upserted".getBytes()); - dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0))); - channel.put(dup); - - utx.commit(); - utx.close(); - - Sink.Status upStatus = sink.process(); - assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF); - - List<String> upRows = scanTableToStrings(table); - assertEquals(numRows + " row(s) expected", numRows, upRows.size()); - - assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted")); - for (int i = 1; i < numRows; i++) { - assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i)); - } - - LOG.info("Testing events with upsert finished successfully."); - } - - private void testEvents(int eventCount, String upsert) throws Exception { - LOG.info("Testing {} events...", eventCount); - - KuduTable table = createNewTable("test" + eventCount + "eventsUp" + upsert); - String tableName = table.getName(); - Context ctx = new Context(ImmutableMap.of("producer.upsert", upsert)); - KuduSink sink = createSink(tableName, ctx); - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - - Transaction tx = channel.getTransaction(); - tx.begin(); - - for (int i = 0; i < eventCount; i++) { - Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes()); - e.setHeaders(ImmutableMap.of("key", String.format("key %s", i))); - channel.put(e); - } - - tx.commit(); - tx.close(); - - Sink.Status status = sink.process(); - if (eventCount == 0) { - assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF); - } else { - assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF); - } - - List<String> rows = scanTableToStrings(table); - assertEquals(eventCount + " row(s) expected", eventCount, rows.size()); - - for (int i = 0; i < eventCount; i++) { - assertTrue("incorrect payload", rows.get(i).contains("payload body " + i)); - } - - LOG.info("Testing {} events finished successfully.", eventCount); - } - - private KuduSink createSink(String tableName, Context ctx) { - LOG.info("Creating Kudu sink for '{}' table...", tableName); - - KuduSink sink = new KuduSink(syncClient); - HashMap<String, String> parameters = new HashMap<>(); - parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName); - parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses()); - parameters.put(KuduSinkConfigurationConstants.PRODUCER, "org.apache.kudu.flume.sink.SimpleKeyedKuduEventProducer"); - Context context = new Context(parameters); - context.putAll(ctx.getParameters()); - Configurables.configure(sink, context); - - LOG.info("Created Kudu sink for '{}' table.", tableName); - - return sink; - } -}
