Author: daijy
Date: Thu Oct 29 04:34:18 2015
New Revision: 1711177

URL: http://svn.apache.org/viewvc?rev=1711177&view=rev
Log:
PIG-4704: Customizable Error Handling for Storers in Pig

Added:
    pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
    pig/trunk/src/org/apache/pig/ErrorHandler.java
    pig/trunk/src/org/apache/pig/ErrorHandling.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
    pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 29 04:34:18 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4704: Customizable Error Handling for Storers in Pig (siddhimehta via 
daijy)
+
 PIG-4717: Update Apache HTTPD LogParser to latest version (nielsbasjes via 
daijy)
 
 PIG-4468: Pig's jackson version conflicts with that of hadoop 2.6.0 or newer 
(zjffdu via daijy)

Modified: pig/trunk/conf/pig.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Oct 29 04:34:18 2015
@@ -568,6 +568,22 @@ hcat.bin=/usr/local/hcat/bin/hcat
 #
 # opt.fetch=true
 
+#########################################################################
+#
+# Error Handling Properties
+#
+# By default, Pig job fails immediately on encountering an errors on writing 
Tuples for Store.
+# If you want Pig to allow certain errors before failing you can set this 
property.
+# If the propery is set to true and the StoreFunc implements ErrorHandling if 
will allow configurable errors 
+# based on the OutputErrorHandler implementation  
+# pig.allow.store.errors = false
+#
+# Controls the minimum number of errors for store
+# pig.errors.min.records = 0
+#
+# Set the threshold for percentage of errors
+# pig.error.threshold.percent = 0.0f
+
 ###########################################################################
 #
 # Streaming properties

Added: pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java (added)
+++ pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java Thu Oct 29 
04:34:18 2015
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+public class CounterBasedErrorHandler implements ErrorHandler {
+
+    public static final String STORER_ERROR_HANDLER_COUNTER_GROUP = 
"storer_Error_Handler";
+    public static final String STORER_ERROR_COUNT = "bad_record_count";
+    public static final String STORER_RECORD_COUNT = "record__count";
+
+    private final long minErrors;
+    private final float errorThreshold; // fraction of errors allowed
+
+    public CounterBasedErrorHandler() {
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+        this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+                0);
+        this.errorThreshold = conf.getFloat(
+                PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f);
+    }
+
+    @Override
+    public void onSuccess(String uniqueSignature) {
+        incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+    }
+
+    @Override
+    public void onError(String uniqueSignature, Exception e, Tuple inputTuple) 
{
+        long numErrors = incAndGetCounter(uniqueSignature, STORER_ERROR_COUNT);
+        long numRecords = incAndGetCounter(uniqueSignature, 
STORER_RECORD_COUNT);
+        boolean exceedThreshold = hasErrorExceededThreshold(numErrors,
+                numRecords);
+        if (exceedThreshold) {
+            throw new RuntimeException(
+                    "Exceeded the error rate while writing records. The latest 
error seen  ",
+                    e);
+        }
+    }
+
+    private boolean hasErrorExceededThreshold(long numErrors, long numRecords) 
{
+        if (numErrors > 0 && errorThreshold <= 0) { // no errors are tolerated
+            return true;
+        }
+        double errRate = numErrors / (double) numRecords;
+        // If we have more than the min allowed errors and if it exceeds the
+        // threshold
+        if (numErrors >= minErrors && errRate > errorThreshold) {
+            return true;
+        }
+        return false;
+    }
+
+    public long getRecordCount(String storeSignature) {
+        Counter counter = getCounter(storeSignature, STORER_RECORD_COUNT);
+        return counter.getValue();
+    }
+
+    private long incAndGetCounter(String storeSignature, String counterName) {
+        Counter counter = getCounter(storeSignature, counterName);
+        counter.increment(1);
+        return counter.getValue();
+    }
+
+    /**
+     * Get Counter for a given counterName and Store Signature
+     * 
+     * @param counterName
+     * @param storeSignature
+     * @return
+     */
+    private Counter getCounter(String storeSignature, String counterName) {
+        PigStatusReporter reporter = PigStatusReporter.getInstance();
+        @SuppressWarnings("deprecation")
+        Counter counter = reporter.getCounter(
+                STORER_ERROR_HANDLER_COUNTER_GROUP,
+                getCounterNameForStore(counterName, storeSignature));
+        return counter;
+    }
+
+    private String getCounterNameForStore(String counterNamePrefix,
+            String storeSignature) {
+        StringBuilder counterName = new StringBuilder()
+                .append(counterNamePrefix).append("_").append(storeSignature);
+        return counterName.toString();
+    }
+}

Added: pig/trunk/src/org/apache/pig/ErrorHandler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ErrorHandler.java (added)
+++ pig/trunk/src/org/apache/pig/ErrorHandler.java Thu Oct 29 04:34:18 2015
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * The interface that handles errors thrown by the
+ * {@link StoreFuncInterface#putNext(Tuple)}
+ *
+ */
+public interface ErrorHandler {
+    /**
+     * Method invoked on success processing of tuple
+     * 
+     * @param uniqueSignature
+     *            a unique signature to identify the operator
+     */
+    public void onSuccess(String uniqueSignature);
+
+    /**
+     * Method invoked when an error occurs processing of tuple
+     * 
+     * @param uniqueSignature
+     *            a unique signature to identify the operator
+     * @param e
+     *            Exception encountered while processing input
+     * @param inputTuple
+     *            the tuple to store.
+     */
+    public void onError(String uniqueSignature, Exception e, Tuple inputTuple);
+}

Added: pig/trunk/src/org/apache/pig/ErrorHandling.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandling.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/ErrorHandling.java (added)
+++ pig/trunk/src/org/apache/pig/ErrorHandling.java Thu Oct 29 04:34:18 2015
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+/**
+ * A {@link StoreFunc} should implement this interface to enable handling 
errors
+ * during {@code StoreFunc#putNext(Tuple)}
+ * 
+ */
+public interface ErrorHandling {
+
+    /**
+     * This method is called to determine the ErrorHandler implementation that
+     * to handle errors in {@code StoreFunc#putNext(Tuple)}
+     * 
+     * @return OutputErrorHandler implementation for store func
+     */
+    ErrorHandler getErrorHandler();
+
+}

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Oct 29 04:34:18 2015
@@ -315,6 +315,21 @@ public class PigConfiguration {
     public static final String PIG_USER_CACHE_REPLICATION = 
"pig.user.cache.replication";
 
     /**
+     * Boolean value used to enable or disable error handling for storers
+     */
+    public static final String PIG_ALLOW_STORE_ERRORS = 
"pig.allow.store.errors";
+
+    /**
+     * Controls the minimum number of errors
+     */
+    public static final String PIG_ERRORS_MIN_RECORDS = 
"pig.errors.min.records";
+
+    /**
+     * Set the threshold for percentage of errors
+     */
+    public static final String PIG_ERROR_THRESHOLD_PERCENT = 
"pig.error.threshold.percent";
+    
+    /**
      * Comma-delimited entries of commands/operators that must be disallowed.
      * This is a security feature to be used by administrators to block use of
      * commands by users. For eg, an admin might like to block all filesystem

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
 Thu Oct 29 04:34:18 2015
@@ -34,6 +34,7 @@ import org.apache.pig.OverwritableStoreF
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
@@ -74,13 +75,14 @@ public class PigOutputFormat extends Out
                 store = reduceStores.get(0);
             }
             StoreFuncInterface sFunc = store.getStoreFunc();
+            StoreFuncDecorator decorator = store.getStoreFuncDecorator();
             // set output location
             PigOutputFormat.setLocation(taskattemptcontext, store);
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
             RecordWriter writer = sFunc.getOutputFormat().getRecordWriter(
                     taskattemptcontext);
-            return new PigRecordWriter(writer, sFunc, Mode.SINGLE_STORE);
+            return new PigRecordWriter(writer, decorator, Mode.SINGLE_STORE);
         } else {
            // multi store case - in this case, all writing is done through
            // MapReducePOStoreImpl - set up a dummy RecordWriter
@@ -107,18 +109,24 @@ public class PigOutputFormat extends Out
         private StoreFuncInterface sFunc;
 
         /**
+         * The StoreFuncDecorator we use to write Tuples
+         */
+        private StoreFuncDecorator storeDecorator;
+
+        /**
          * Single Query or multi query
          */
         private Mode mode;
 
-        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface 
sFunc,
+        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncDecorator 
storeDecorator,
                 Mode mode)
                 throws IOException {
             this.mode = mode;
 
             if(mode == Mode.SINGLE_STORE) {
                 this.wrappedWriter = wrappedWriter;
-                this.sFunc = sFunc;
+                this.sFunc = storeDecorator.getStorer();
+                this.storeDecorator = storeDecorator;
                 this.sFunc.prepareToWrite(this.wrappedWriter);
             }
         }
@@ -133,7 +141,7 @@ public class PigOutputFormat extends Out
         public void write(WritableComparable key, Tuple value)
                 throws IOException, InterruptedException {
             if(mode == Mode.SINGLE_STORE) {
-                sFunc.putNext(value);
+                storeDecorator.putNext(value);
             } else {
                 throw new IOException("Internal Error: Unexpected code path");
             }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1711177&r1=1711176&r2=1711177&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 Thu Oct 29 04:34:18 2015
@@ -52,6 +52,7 @@ public class POStore extends PhysicalOpe
     private static final long serialVersionUID = 1L;
     protected static Result empty = new Result(POStatus.STATUS_NULL, null);
     transient private StoreFuncInterface storer;
+    transient private StoreFuncDecorator sDecorator;
     transient private POStoreImpl impl;
     transient private String counterName = null;
     private FileSpec sFile;
@@ -116,7 +117,7 @@ public class POStore extends PhysicalOpe
     public void setUp() throws IOException{
         if (impl != null) {
             try{
-                storer = impl.createStoreFunc(this);
+                storer = impl.createStoreFunc(this); 
                 if (!isTmpStore && !disableCounter && impl instanceof 
MapReducePOStoreImpl) {
                     counterName = PigStatsUtil.getMultiStoreCounterName(this);
                     if (counterName != null) {
@@ -161,7 +162,7 @@ public class POStore extends PhysicalOpe
             switch (res.returnStatus) {
             case POStatus.STATUS_OK:
                 if (illustrator == null) {
-                    storer.putNext((Tuple)res.result);
+                    sDecorator.putNext((Tuple) res.result);
                 } else
                     illustratorMarkup(res.result, res.result, 0);
                 res = empty;
@@ -250,11 +251,25 @@ public class POStore extends PhysicalOpe
         if(storer == null){
             storer = 
(StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
             storer.setStoreFuncUDFContextSignature(signature);
+            // Init the Decorator we use for writing Tuples
+            setStoreFuncDecorator(new StoreFuncDecorator(storer, signature));
         }
         return storer;
     }
+    
+    void setStoreFuncDecorator(StoreFuncDecorator sDecorator) {
+        this.sDecorator = sDecorator;
+    }
 
     /**
+     * 
+     * @return The {@link StoreFuncDecorator} used to write Tuples
+     */
+    public StoreFuncDecorator getStoreFuncDecorator() {
+        return sDecorator;
+    }
+    
+    /**
      * @param sortInfo the sortInfo to set
      */
     public void setSortInfo(SortInfo sortInfo) {

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1711177&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
 Thu Oct 29 04:34:18 2015
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to decorate the {@code StoreFunc#putNext(Tuple)}. It
+ * handles errors by calling
+ * {@code OutputErrorHandler#handle(String, long, Throwable)} if the
+ * {@link StoreFunc} implements {@link ErrorHandling}
+ * 
+ */
+public class StoreFuncDecorator {
+
+    private final StoreFuncInterface storer;
+    private final String udfSignature;
+    private boolean shouldHandleErrors;
+    private ErrorHandler errorHandler;
+
+    public StoreFuncDecorator(StoreFuncInterface storer, String udfSignature) {
+        this.storer = storer;
+        this.udfSignature = udfSignature;
+        init();
+    }
+
+    private void init() {
+        // The decorators work is mainly on backend only so not creating error
+        // handler on frontend
+        if (UDFContext.getUDFContext().isFrontend()) {
+            return;
+        }
+        if (storer instanceof ErrorHandling && allowErrors()) {
+            errorHandler = ((ErrorHandling) storer).getErrorHandler();
+            shouldHandleErrors = true;
+        }
+    }
+
+    private boolean allowErrors() {
+        return UDFContext.getUDFContext().getJobConf()
+                .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
+    }
+
+    /**
+     * Call {@code StoreFunc#putNext(Tuple)} and handle errors
+     * 
+     * @param tuple
+     *            the tuple to store.
+     * @throws IOException
+     */
+    public void putNext(Tuple tuple) throws IOException {
+        try {
+            storer.putNext(tuple);
+            if (shouldHandleErrors) {
+                errorHandler.onSuccess(udfSignature);
+            }
+        } catch (Exception e) {
+            if (shouldHandleErrors) {
+                errorHandler.onError(udfSignature, e, tuple);
+            } else {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public StoreFuncInterface getStorer() {
+        return storer;
+    }
+}

Added: pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1711177&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Thu Oct 
29 04:34:18 2015
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.pig.CounterBasedErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ExecType;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * This class contains Unit tests for store func which has a certain error
+ * threshold set.
+ * 
+ */
+public class TestErrorHandlingStoreFunc {
+
+    private static PigServer pigServer;
+    private File tempDir;
+
+    @Before
+    public void setup() throws IOException {
+        pigServer = new PigServer(ExecType.LOCAL);
+        tempDir = Files.createTempDir();
+        tempDir.deleteOnExit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        pigServer.shutdown();
+        tempDir.delete();
+    }
+
+    public static class TestErroroneousStoreFunc extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousStoreFunc2 extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    /**
+     * Test Pig job succeeds even with errors within threshold
+     * 
+     */
+    @Test
+    public void testStorerWithErrorInLimit() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        runTest(JOB_STATUS.COMPLETED);
+    }
+
+    /**
+     * Test Pig job fails if errors exceed min errors and threshold
+     */
+    @Test
+    public void testStorerWithErrorOutExceedingLimit() throws Exception {
+        updatePigProperties(true, 2L, 0.3);
+        runTest(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job fails on error if the config is set to false
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testStorerWithConfigNotEnabled() throws Exception {
+        updatePigProperties(false, 3L, 0.3);
+        runTest(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job with multiple stores.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testMultiStore() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        pigServer.getPigContext().getProperties()
+                .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+        String storeAQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeAQuery);
+        pigServer.registerQuery("B = FILTER A by id >0;");
+        String storeBQuery = "store B into '" + tempDir.getAbsolutePath()
+                + "/output2' using "
+                + TestErroroneousStoreFunc2.class.getName() + "();";
+        pigServer.registerQuery(storeBQuery);
+        if (pigServer.executeBatch().get(0).getStatus() != 
JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+    }
+
+    private void runTest(JOB_STATUS expectedJobStatus) throws Exception {
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeQuery);
+
+        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void updatePigProperties(boolean allowErrors, long minErrors,
+            double errorThreshold) {
+        Properties properties = pigServer.getPigContext().getProperties();
+        properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
+                Boolean.toString(allowErrors));
+        properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+                Long.toString(minErrors));
+        properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
+                Double.toString(errorThreshold));
+    }
+}


Reply via email to