Author: daijy
Date: Thu Oct 29 04:34:18 2015
New Revision: 1711177
URL: http://svn.apache.org/viewvc?rev=1711177&view=rev
Log:
PIG-4704: Customizable Error Handling for Storers in Pig
Added:
pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
pig/trunk/src/org/apache/pig/ErrorHandler.java
pig/trunk/src/org/apache/pig/ErrorHandling.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 29 04:34:18 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4704: Customizable Error Handling for Storers in Pig (siddhimehta via
daijy)
+
PIG-4717: Update Apache HTTPD LogParser to latest version (nielsbasjes via
daijy)
PIG-4468: Pig's jackson version conflicts with that of hadoop 2.6.0 or newer
(zjffdu via daijy)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Oct 29 04:34:18 2015
@@ -568,6 +568,22 @@ hcat.bin=/usr/local/hcat/bin/hcat
#
# opt.fetch=true
+#########################################################################
+#
+# Error Handling Properties
+#
+# By default, Pig job fails immediately on encountering an errors on writing
Tuples for Store.
+# If you want Pig to allow certain errors before failing you can set this
property.
+# If the propery is set to true and the StoreFunc implements ErrorHandling if
will allow configurable errors
+# based on the OutputErrorHandler implementation
+# pig.allow.store.errors = false
+#
+# Controls the minimum number of errors for store
+# pig.errors.min.records = 0
+#
+# Set the threshold for percentage of errors
+# pig.error.threshold.percent = 0.0f
+
###########################################################################
#
# Streaming properties
Added: pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java (added)
+++ pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java Thu Oct 29
04:34:18 2015
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class CounterBasedErrorHandler implements ErrorHandler {
+
+ public static final String STORER_ERROR_HANDLER_COUNTER_GROUP =
"storer_Error_Handler";
+ public static final String STORER_ERROR_COUNT = "bad_record_count";
+ public static final String STORER_RECORD_COUNT = "record__count";
+
+ private final long minErrors;
+ private final float errorThreshold; // fraction of errors allowed
+
+ public CounterBasedErrorHandler() {
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+ this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+ 0);
+ this.errorThreshold = conf.getFloat(
+ PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f);
+ }
+
+ @Override
+ public void onSuccess(String uniqueSignature) {
+ incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+ }
+
+ @Override
+ public void onError(String uniqueSignature, Exception e, Tuple inputTuple)
{
+ long numErrors = incAndGetCounter(uniqueSignature, STORER_ERROR_COUNT);
+ long numRecords = incAndGetCounter(uniqueSignature,
STORER_RECORD_COUNT);
+ boolean exceedThreshold = hasErrorExceededThreshold(numErrors,
+ numRecords);
+ if (exceedThreshold) {
+ throw new RuntimeException(
+ "Exceeded the error rate while writing records. The latest
error seen ",
+ e);
+ }
+ }
+
+ private boolean hasErrorExceededThreshold(long numErrors, long numRecords)
{
+ if (numErrors > 0 && errorThreshold <= 0) { // no errors are tolerated
+ return true;
+ }
+ double errRate = numErrors / (double) numRecords;
+ // If we have more than the min allowed errors and if it exceeds the
+ // threshold
+ if (numErrors >= minErrors && errRate > errorThreshold) {
+ return true;
+ }
+ return false;
+ }
+
+ public long getRecordCount(String storeSignature) {
+ Counter counter = getCounter(storeSignature, STORER_RECORD_COUNT);
+ return counter.getValue();
+ }
+
+ private long incAndGetCounter(String storeSignature, String counterName) {
+ Counter counter = getCounter(storeSignature, counterName);
+ counter.increment(1);
+ return counter.getValue();
+ }
+
+ /**
+ * Get Counter for a given counterName and Store Signature
+ *
+ * @param counterName
+ * @param storeSignature
+ * @return
+ */
+ private Counter getCounter(String storeSignature, String counterName) {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ @SuppressWarnings("deprecation")
+ Counter counter = reporter.getCounter(
+ STORER_ERROR_HANDLER_COUNTER_GROUP,
+ getCounterNameForStore(counterName, storeSignature));
+ return counter;
+ }
+
+ private String getCounterNameForStore(String counterNamePrefix,
+ String storeSignature) {
+ StringBuilder counterName = new StringBuilder()
+ .append(counterNamePrefix).append("_").append(storeSignature);
+ return counterName.toString();
+ }
+}
Added: pig/trunk/src/org/apache/pig/ErrorHandler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ErrorHandler.java (added)
+++ pig/trunk/src/org/apache/pig/ErrorHandler.java Thu Oct 29 04:34:18 2015
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface that handles errors thrown by the
+ * {@link StoreFuncInterface#putNext(Tuple)}
+ *
+ */
+public interface ErrorHandler {
+ /**
+ * Method invoked on success processing of tuple
+ *
+ * @param uniqueSignature
+ * a unique signature to identify the operator
+ */
+ public void onSuccess(String uniqueSignature);
+
+ /**
+ * Method invoked when an error occurs processing of tuple
+ *
+ * @param uniqueSignature
+ * a unique signature to identify the operator
+ * @param e
+ * Exception encountered while processing input
+ * @param inputTuple
+ * the tuple to store.
+ */
+ public void onError(String uniqueSignature, Exception e, Tuple inputTuple);
+}
Added: pig/trunk/src/org/apache/pig/ErrorHandling.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandling.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ErrorHandling.java (added)
+++ pig/trunk/src/org/apache/pig/ErrorHandling.java Thu Oct 29 04:34:18 2015
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+/**
+ * A {@link StoreFunc} should implement this interface to enable handling
errors
+ * during {@code StoreFunc#putNext(Tuple)}
+ *
+ */
+public interface ErrorHandling {
+
+ /**
+ * This method is called to determine the ErrorHandler implementation that
+ * to handle errors in {@code StoreFunc#putNext(Tuple)}
+ *
+ * @return OutputErrorHandler implementation for store func
+ */
+ ErrorHandler getErrorHandler();
+
+}
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Oct 29 04:34:18 2015
@@ -315,6 +315,21 @@ public class PigConfiguration {
public static final String PIG_USER_CACHE_REPLICATION =
"pig.user.cache.replication";
/**
+ * Boolean value used to enable or disable error handling for storers
+ */
+ public static final String PIG_ALLOW_STORE_ERRORS =
"pig.allow.store.errors";
+
+ /**
+ * Controls the minimum number of errors
+ */
+ public static final String PIG_ERRORS_MIN_RECORDS =
"pig.errors.min.records";
+
+ /**
+ * Set the threshold for percentage of errors
+ */
+ public static final String PIG_ERROR_THRESHOLD_PERCENT =
"pig.error.threshold.percent";
+
+ /**
* Comma-delimited entries of commands/operators that must be disallowed.
* This is a security feature to be used by administrators to block use of
* commands by users. For eg, an admin might like to block all filesystem
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
Thu Oct 29 04:34:18 2015
@@ -34,6 +34,7 @@ import org.apache.pig.OverwritableStoreF
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
@@ -74,13 +75,14 @@ public class PigOutputFormat extends Out
store = reduceStores.get(0);
}
StoreFuncInterface sFunc = store.getStoreFunc();
+ StoreFuncDecorator decorator = store.getStoreFuncDecorator();
// set output location
PigOutputFormat.setLocation(taskattemptcontext, store);
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
RecordWriter writer = sFunc.getOutputFormat().getRecordWriter(
taskattemptcontext);
- return new PigRecordWriter(writer, sFunc, Mode.SINGLE_STORE);
+ return new PigRecordWriter(writer, decorator, Mode.SINGLE_STORE);
} else {
// multi store case - in this case, all writing is done through
// MapReducePOStoreImpl - set up a dummy RecordWriter
@@ -107,18 +109,24 @@ public class PigOutputFormat extends Out
private StoreFuncInterface sFunc;
/**
+ * The StoreFuncDecorator we use to write Tuples
+ */
+ private StoreFuncDecorator storeDecorator;
+
+ /**
* Single Query or multi query
*/
private Mode mode;
- public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface
sFunc,
+ public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncDecorator
storeDecorator,
Mode mode)
throws IOException {
this.mode = mode;
if(mode == Mode.SINGLE_STORE) {
this.wrappedWriter = wrappedWriter;
- this.sFunc = sFunc;
+ this.sFunc = storeDecorator.getStorer();
+ this.storeDecorator = storeDecorator;
this.sFunc.prepareToWrite(this.wrappedWriter);
}
}
@@ -133,7 +141,7 @@ public class PigOutputFormat extends Out
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException {
if(mode == Mode.SINGLE_STORE) {
- sFunc.putNext(value);
+ storeDecorator.putNext(value);
} else {
throw new IOException("Internal Error: Unexpected code path");
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
Thu Oct 29 04:34:18 2015
@@ -52,6 +52,7 @@ public class POStore extends PhysicalOpe
private static final long serialVersionUID = 1L;
protected static Result empty = new Result(POStatus.STATUS_NULL, null);
transient private StoreFuncInterface storer;
+ transient private StoreFuncDecorator sDecorator;
transient private POStoreImpl impl;
transient private String counterName = null;
private FileSpec sFile;
@@ -116,7 +117,7 @@ public class POStore extends PhysicalOpe
public void setUp() throws IOException{
if (impl != null) {
try{
- storer = impl.createStoreFunc(this);
+ storer = impl.createStoreFunc(this);
if (!isTmpStore && !disableCounter && impl instanceof
MapReducePOStoreImpl) {
counterName = PigStatsUtil.getMultiStoreCounterName(this);
if (counterName != null) {
@@ -161,7 +162,7 @@ public class POStore extends PhysicalOpe
switch (res.returnStatus) {
case POStatus.STATUS_OK:
if (illustrator == null) {
- storer.putNext((Tuple)res.result);
+ sDecorator.putNext((Tuple) res.result);
} else
illustratorMarkup(res.result, res.result, 0);
res = empty;
@@ -250,11 +251,25 @@ public class POStore extends PhysicalOpe
if(storer == null){
storer =
(StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
storer.setStoreFuncUDFContextSignature(signature);
+ // Init the Decorator we use for writing Tuples
+ setStoreFuncDecorator(new StoreFuncDecorator(storer, signature));
}
return storer;
}
+
+ void setStoreFuncDecorator(StoreFuncDecorator sDecorator) {
+ this.sDecorator = sDecorator;
+ }
/**
+ *
+ * @return The {@link StoreFuncDecorator} used to write Tuples
+ */
+ public StoreFuncDecorator getStoreFuncDecorator() {
+ return sDecorator;
+ }
+
+ /**
* @param sortInfo the sortInfo to set
*/
public void setSortInfo(SortInfo sortInfo) {
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1711177&view=auto
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
(added)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
Thu Oct 29 04:34:18 2015
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to decorate the {@code StoreFunc#putNext(Tuple)}. It
+ * handles errors by calling
+ * {@code OutputErrorHandler#handle(String, long, Throwable)} if the
+ * {@link StoreFunc} implements {@link ErrorHandling}
+ *
+ */
+public class StoreFuncDecorator {
+
+ private final StoreFuncInterface storer;
+ private final String udfSignature;
+ private boolean shouldHandleErrors;
+ private ErrorHandler errorHandler;
+
+ public StoreFuncDecorator(StoreFuncInterface storer, String udfSignature) {
+ this.storer = storer;
+ this.udfSignature = udfSignature;
+ init();
+ }
+
+ private void init() {
+ // The decorators work is mainly on backend only so not creating error
+ // handler on frontend
+ if (UDFContext.getUDFContext().isFrontend()) {
+ return;
+ }
+ if (storer instanceof ErrorHandling && allowErrors()) {
+ errorHandler = ((ErrorHandling) storer).getErrorHandler();
+ shouldHandleErrors = true;
+ }
+ }
+
+ private boolean allowErrors() {
+ return UDFContext.getUDFContext().getJobConf()
+ .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
+ }
+
+ /**
+ * Call {@code StoreFunc#putNext(Tuple)} and handle errors
+ *
+ * @param tuple
+ * the tuple to store.
+ * @throws IOException
+ */
+ public void putNext(Tuple tuple) throws IOException {
+ try {
+ storer.putNext(tuple);
+ if (shouldHandleErrors) {
+ errorHandler.onSuccess(udfSignature);
+ }
+ } catch (Exception e) {
+ if (shouldHandleErrors) {
+ errorHandler.onError(udfSignature, e, tuple);
+ } else {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public StoreFuncInterface getStorer() {
+ return storer;
+ }
+}
Added: pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Thu Oct
29 04:34:18 2015
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.pig.CounterBasedErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ExecType;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * This class contains Unit tests for store func which has a certain error
+ * threshold set.
+ *
+ */
+public class TestErrorHandlingStoreFunc {
+
+ private static PigServer pigServer;
+ private File tempDir;
+
+ @Before
+ public void setup() throws IOException {
+ pigServer = new PigServer(ExecType.LOCAL);
+ tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ pigServer.shutdown();
+ tempDir.delete();
+ }
+
+ public static class TestErroroneousStoreFunc extends PigStorage implements
+ ErrorHandling {
+ protected static AtomicLong COUNTER = new AtomicLong();
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ long count = COUNTER.incrementAndGet();
+ super.putNext(f);
+ if (count % 3 == 0) {
+ throw new RuntimeException("Throw error for test");
+ }
+ }
+
+ @Override
+ public ErrorHandler getErrorHandler() {
+ return new CounterBasedErrorHandler();
+ }
+ }
+
+ public static class TestErroroneousStoreFunc2 extends PigStorage implements
+ ErrorHandling {
+ protected static AtomicLong COUNTER = new AtomicLong();
+
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ long count = COUNTER.incrementAndGet();
+ super.putNext(f);
+ if (count % 3 == 0) {
+ throw new RuntimeException("Throw error for test");
+ }
+ }
+
+ @Override
+ public ErrorHandler getErrorHandler() {
+ return new CounterBasedErrorHandler();
+ }
+ }
+
+ /**
+ * Test Pig job succeeds even with errors within threshold
+ *
+ */
+ @Test
+ public void testStorerWithErrorInLimit() throws Exception {
+ updatePigProperties(true, 3L, 0.4);
+ runTest(JOB_STATUS.COMPLETED);
+ }
+
+ /**
+ * Test Pig job fails if errors exceed min errors and threshold
+ */
+ @Test
+ public void testStorerWithErrorOutExceedingLimit() throws Exception {
+ updatePigProperties(true, 2L, 0.3);
+ runTest(JOB_STATUS.FAILED);
+ }
+
+ /**
+ * Test Pig Job fails on error if the config is set to false
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testStorerWithConfigNotEnabled() throws Exception {
+ updatePigProperties(false, 3L, 0.3);
+ runTest(JOB_STATUS.FAILED);
+ }
+
+ /**
+ * Test Pig Job with multiple stores.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultiStore() throws Exception {
+ updatePigProperties(true, 3L, 0.4);
+ pigServer.getPigContext().getProperties()
+ .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ Data data = Storage.resetData(pigServer);
+ final Collection<Tuple> list = Lists.newArrayList();
+ // Create input dataset
+ int rows = 10;
+ for (int i = 0; i < rows; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(i);
+ t.append("a" + i);
+ list.add(t);
+ }
+ data.set("in", "id:int,name:chararray", list);
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+ String storeAQuery = "store A into '" + tempDir.getAbsolutePath()
+ + "/output' using " + TestErroroneousStoreFunc.class.getName()
+ + "();";
+ pigServer.registerQuery(storeAQuery);
+ pigServer.registerQuery("B = FILTER A by id >0;");
+ String storeBQuery = "store B into '" + tempDir.getAbsolutePath()
+ + "/output2' using "
+ + TestErroroneousStoreFunc2.class.getName() + "();";
+ pigServer.registerQuery(storeBQuery);
+ if (pigServer.executeBatch().get(0).getStatus() !=
JOB_STATUS.COMPLETED) {
+ throw new RuntimeException("Job failed", pigServer.executeBatch()
+ .get(0).getException());
+ }
+ }
+
+ private void runTest(JOB_STATUS expectedJobStatus) throws Exception {
+ Data data = Storage.resetData(pigServer);
+ final Collection<Tuple> list = Lists.newArrayList();
+ // Create input dataset
+ int rows = 10;
+ for (int i = 0; i < rows; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(i);
+ t.append("a" + i);
+ list.add(t);
+ }
+ data.set("in", "id:int,name:chararray", list);
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+ String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+ + "/output' using " + TestErroroneousStoreFunc.class.getName()
+ + "();";
+ pigServer.registerQuery(storeQuery);
+
+ if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+ throw new RuntimeException("Job did not reach the expected status"
+ + pigServer.executeBatch().get(0).getStatus());
+ }
+ }
+
+ private void updatePigProperties(boolean allowErrors, long minErrors,
+ double errorThreshold) {
+ Properties properties = pigServer.getPigContext().getProperties();
+ properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
+ Boolean.toString(allowErrors));
+ properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+ Long.toString(minErrors));
+ properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
+ Double.toString(errorThreshold));
+ }
+}