Repository: incubator-gobblin Updated Branches: refs/heads/master 784d7106f -> 4cab75d15
[GOBBLIN-177] Added error limit for records failed during conversion in batch execution Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [ ] My PR addresses the following [Gobblin JIRA] (https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR" - https://issues.apache.org/jira/browse/GOBBLIN-177 ### Description - [ ] Here are some details about my PR, including screenshots (if applicable): Modified task.java to have a configurable error limit for DataConversionExceptions Also added a Integration test to verify the same ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Added test TaskSkipErrRecordsIntegrationTest ### Commits - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" Closes #2065 from aditya1105/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4cab75d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4cab75d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4cab75d1 Branch: refs/heads/master Commit: 4cab75d158b2db6b5aaac7b3c253283330ddde6d Parents: 784d710 Author: aditya1105 <[email protected]> Authored: Mon Aug 21 22:55:11 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Aug 21 22:55:11 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/gobblin/runtime/Task.java | 17 +++++- .../gobblin/runtime/TaskConfigurationKeys.java | 3 + .../TaskSkipErrRecordsIntegrationTest.java | 63 ++++++++++++++++++++ .../org/apache/gobblin/TestAvroConverter.java | 33 ++++++++++ .../task_skip_err_records.properties | 42 +++++++++++++ 5 files changed, 156 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 20b183f..69c9a1c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.BooleanUtils; +import org.apache.gobblin.converter.DataConversionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -416,10 +417,22 @@ public class Task implements TaskIFace { } else { RecordEnvelope record; // Extract, convert, and fork one source record at a time. + long errRecords = 0; while ((record = extractor.readRecordEnvelope()) != null) { onRecordExtract(); - for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) { - processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null); + try { + for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) { + processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null); + } + } catch (Exception e) { + if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) { + throw new RuntimeException(e.getCause()); + } + errRecords++; + if (errRecords > this.taskState.getPropAsLong(TaskConfigurationKeys.TASK_SKIP_ERROR_RECORDS, + TaskConfigurationKeys.DEFAULT_TASK_SKIP_ERROR_RECORDS)) { + throw new RuntimeException(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java index 33e6fba..8325781 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskConfigurationKeys.java @@ -37,4 +37,7 @@ public class TaskConfigurationKeys { public static final String TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = "gobblin.task.is.single.branch.synchronous"; public static final String DEFAULT_TASK_IS_SINGLE_BRANCH_SYNCHRONOUS = Boolean.toString(false); + + public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records"; + public static final long DEFAULT_TASK_SKIP_ERROR_RECORDS = 0; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java new file mode 100644 index 0000000..11eafef --- /dev/null +++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java @@ -0,0 +1,63 @@ +package org.apache.gobblin; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.JobException; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +@Test +public class TaskSkipErrRecordsIntegrationTest { + private static final String SAMPLE_FILE = "test.avro"; + public static final String TASK_SKIP_ERROR_RECORDS = "task.skip.error.records"; + public static final String ONE = "1"; + public static final String ZERO = "0"; + + @BeforeTest + @AfterTest + public void cleanDir() + throws IOException { + GobblinLocalJobLauncherUtils.cleanDir(); + } + + /** + * Converter will throw DataConversionException while trying to convert + * first record. Since task.skip.error.records is set to 0, this job should fail. + */ + @Test(expectedExceptions = JobException.class) + public void skipZeroErrorRecordTest() + throws Exception { + Properties jobProperties = getProperties(); + jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ZERO); + GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties); + } + + /** + * Converter will throw DataConversionException while trying to convert + * first record. Since task.skip.error.records is set to 1, this job should succeed + */ + @Test + public void skipOneErrorRecordTest() + throws Exception { + Properties jobProperties = getProperties(); + jobProperties.setProperty(TASK_SKIP_ERROR_RECORDS, ONE); + GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties); + } + + private Properties getProperties() + throws IOException { + Properties jobProperties = + GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/task_skip_err_records.properties"); + FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + SAMPLE_FILE), + new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE)); + jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, + GobblinLocalJobLauncherUtils.RESOURCE_DIR + GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE); + return jobProperties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java new file mode 100644 index 0000000..d0eddba --- /dev/null +++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TestAvroConverter.java @@ -0,0 +1,33 @@ +package org.apache.gobblin; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.Converter; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.converter.SingleRecordIterable; + + +/** + * Test converter which throws DataConversionException while converting first record + */ +public class TestAvroConverter extends Converter<Schema, Schema, GenericRecord, GenericRecord> { + private long recordCount = 0; + + @Override + public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + return inputSchema; + } + + @Override + public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord, WorkUnitState workUnit) + throws DataConversionException { + recordCount++; + if (recordCount == 1) { + throw new DataConversionException("Unable to convert record"); + } + return new SingleRecordIterable<GenericRecord>(inputRecord); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4cab75d1/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties new file mode 100644 index 0000000..7e94ba8 --- /dev/null +++ b/gobblin-test-harness/src/test/resources/runtime_test/task_skip_err_records.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +job.name=task_skip_err_records.job +job.commit.policy=full + +job.lock.enabled=false + +extract.namespace=data.writerOutput +extract.table.type=snapshot_append + +writer.destination.type=HDFS +writer.eager.initialization=true +writer.file.path=output +writer.output.format=AVRO + +state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store +writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging +writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output +data.publisher.final.dir=./gobblin-test-harness/src/test/resources/runtime_test/final_dir + + +source.class=org.apache.gobblin.TestAvroSource +task.skip.error.records=1 +converter.classes=org.apache.gobblin.TestAvroConverter +publish.data.at.job.level=true +pubisher.class=org.apache.gobblin.publisher.BaseDataPublisher +task.maxretries=0 \ No newline at end of file
