Repository: incubator-metron Updated Branches: refs/heads/master 27ee49096 -> 134a23311
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java deleted file mode 100644 index da4a549..0000000 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.writer.message; - - -import org.apache.storm.tuple.Tuple; -import org.json.simple.JSONObject; - -public enum MessageGetters implements MessageGetter{ - RAW(RawMessageGetter.DEFAULT) - ,NAMED(NamedMessageGetter.DEFAULT) - ; - MessageGetter getter; - MessageGetters(MessageGetter getter) { - this.getter = getter; - } - @Override - public JSONObject getMessage(Tuple t) { - return getter.getMessage(t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java deleted file mode 100644 index fdd5fb8..0000000 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.writer.message; - -import org.apache.storm.tuple.Tuple; -import org.json.simple.JSONObject; - -public class NamedMessageGetter implements MessageGetter { - public static NamedMessageGetter DEFAULT = new NamedMessageGetter("message"); - private String messageName; - public NamedMessageGetter(String name) { - this.messageName = name; - } - @Override - public JSONObject getMessage(Tuple tuple) { - return (JSONObject)tuple.getValueByField(messageName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java deleted file mode 100644 index 99a8378..0000000 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.writer.message; - -import org.apache.storm.tuple.Tuple; -import org.apache.metron.common.utils.JSONUtils; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; - -import java.io.UnsupportedEncodingException; - -public class RawMessageGetter implements MessageGetter { - public static RawMessageGetter DEFAULT = new RawMessageGetter(0); - private ThreadLocal<JSONParser> parser = new ThreadLocal<JSONParser>() { - @Override - protected JSONParser initialValue() { - return new JSONParser(); - } - }; - int position = 0; - public RawMessageGetter(int position) { - this.position = position; - } - @Override - public JSONObject getMessage(Tuple t) { - byte[] data = t.getBinary(position); - try { - return (JSONObject) parser.get().parse(new String(data, "UTF8")); - } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java new file mode 100644 index 0000000..c560b30 --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer; + +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.utils.ErrorUtils; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({BulkWriterComponent.class, ErrorUtils.class}) +public class BulkWriterComponentTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Mock + private OutputCollector collector; + + @Mock + private BulkMessageWriter<JSONObject> bulkMessageWriter; + + @Mock + private WriterConfiguration configurations; + + @Mock + private Tuple tuple1; + + @Mock + private Tuple tuple2; + + @Mock + private MessageGetStrategy messageGetStrategy; + + private String sensorType = "testSensor"; + private List<Tuple> tupleList; + private JSONObject message1 = new JSONObject(); + private JSONObject message2 = new JSONObject(); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + mockStatic(ErrorUtils.class); + message1.put("value", "message1"); + message2.put("value", "message2"); + when(tuple1.getValueByField("message")).thenReturn(message1); + when(tuple2.getValueByField("message")).thenReturn(message2); + tupleList = Arrays.asList(tuple1, tuple2); + when(configurations.isEnabled(any())).thenReturn(true); + when(configurations.getBatchSize(any())).thenReturn(2); + when(messageGetStrategy.get(tuple1)).thenReturn(message1); + when(messageGetStrategy.get(tuple2)).thenReturn(message2); + } + + @Test + public void writeShouldProperlyAckTuplesInBatch() throws Exception { + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllSuccesses(tupleList); + + when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response); + + BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); + bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + + verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1)); + verify(collector, times(0)).ack(tuple1); + verify(collector, times(0)).ack(tuple2); + + bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); + + verify(collector, times(1)).ack(tuple1); + verify(collector, times(1)).ack(tuple2); + verifyStatic(times(0)); + ErrorUtils.handleError(eq(collector), any(MetronError.class)); + } + + @Test + public void writeShouldProperlyHandleWriterErrors() throws Exception { + Throwable e = new Exception("test exception"); + MetronError error = new MetronError() + .withSensorType(sensorType) + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllErrors(e, tupleList); + + when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response); + + BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); + bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); + + verifyStatic(times(1)); + ErrorUtils.handleError(collector, error); + } + + @Test + public void writeShouldThrowExceptionWhenHandleErrorIsFalse() throws Exception { + exception.expect(IllegalStateException.class); + + Throwable e = new Exception("test exception"); + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllErrors(e, tupleList); + + when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response); + + BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector, true, false); + bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); + } + + @Test + public void writeShouldProperlyHandleWriterException() throws Exception { + Throwable e = new Exception("test exception"); + MetronError error = new MetronError() + .withSensorType(sensorType) + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllErrors(e, tupleList); + + when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenThrow(e); + + BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); + bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); + + verifyStatic(times(1)); + ErrorUtils.handleError(collector, error); + } + + @Test + public void errorAllShouldClearMapsAndHandleErrors() throws Exception { + Throwable e = new Exception("test exception"); + MetronError error1 = new MetronError() + .withSensorType("sensor1") + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1)); + MetronError error2 = new MetronError() + .withSensorType("sensor2") + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2)); + + BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); + bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); + bulkWriterComponent.errorAll(e, messageGetStrategy); + + verifyStatic(times(1)); + ErrorUtils.handleError(collector, error1); + ErrorUtils.handleError(collector, error2); + + bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); + verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1)); + } + + +}