[
https://issues.apache.org/jira/browse/BEAM-3484?focusedWorklogId=92819&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92819
]
ASF GitHub Bot logged work on BEAM-3484:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/18 20:04
Start Date: 19/Apr/18 20:04
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5166: [BEAM-3484] Fix
split issue in HadoopInputFormatIOIT
URL: https://github.com/apache/beam/pull/5166
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index b22d57caa67..0ffd402320d 100644
---
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,6 +164,21 @@
* .withValueTranslation(myOutputValueType);
* }
* </pre>
+ *
+ * <p>IMPORTANT! In case of using {@code DBInputFormat} to read data from
RDBMS, Beam parallelizes
+ * the process by using LIMIT and OFFSET clauses of SQL query to fetch
different ranges of records
+ * (as a split) by different workers. To guarantee the same order and proper
split of results you
+ * need to order them by one or more keys (either PRIMARY or UNIQUE). It can
be done during
+ * configuration step, for example:
+ *
+ * <pre>
+ * {@code
+ * Configuration conf = new Configuration();
+ * conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ * conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+ * conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
+ * }
+ * </pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class HadoopInputFormatIO {
@@ -283,7 +299,9 @@
/**
* Validates that the mandatory configuration properties such as
InputFormat class, InputFormat
- * key and value classes are provided in the Hadoop configuration.
+ * key and value classes are provided in the Hadoop configuration. In case
of using {@code
+ * DBInputFormat} you need to order results by one or more keys. It can be
done by setting
+ * configuration option "mapreduce.jdbc.input.orderby".
*/
private void validateConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
@@ -294,6 +312,13 @@ private void validateConfiguration(Configuration
configuration) {
configuration.get("key.class") != null, "configuration must contain
\"key.class\"");
checkArgument(
configuration.get("value.class") != null, "configuration must
contain \"value.class\"");
+ if
(configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat"))
{
+ checkArgument(
+ configuration.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY) != null,
+ "Configuration must contain \""
+ + DBConfiguration.INPUT_ORDER_BY_PROPERTY
+ + "\" when using DBInputFormat");
+ }
}
/**
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
index 58f3b0dafa0..e24dd68dd2c 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -110,6 +110,7 @@ private static void
setupHadoopConfiguration(IOTestPipelineOptions options) {
);
conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+ conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY,
TestRowDBWritable.class, DBWritable.class);
conf.setClass("key.class", LongWritable.class, Object.class);
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 4238e9b6505..c472442b09c 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -805,6 +806,20 @@ public void
testImmutablityOfOutputOfReadIfRecordReaderObjectsAreImmutable() thr
assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
}
+ @Test
+ public void testValidateConfigurationWithDBInputFormat() {
+ Configuration conf = new Configuration();
+ conf.setClass("key.class", LongWritable.class, Object.class);
+ conf.setClass("value.class", Text.class, Object.class);
+ conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class,
InputFormat.class);
+
+ thrown.expect(IllegalArgumentException.class);
+ HadoopInputFormatIO.<String, String>read()
+ .withConfiguration(new SerializableConfiguration(conf).get())
+ .withKeyTranslation(myKeyTranslate)
+ .withValueTranslation(myValueTranslate);
+ }
+
private static SerializableConfiguration loadTestConfiguration(Class<?>
inputFormatClassName,
Class<?> keyClass, Class<?> valueClass) {
Configuration conf = new Configuration();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 92819)
Time Spent: 2h (was: 1h 50m)
> HadoopInputFormatIO reads big datasets invalid
> ----------------------------------------------
>
> Key: BEAM-3484
> URL: https://issues.apache.org/jira/browse/BEAM-3484
> Project: Beam
> Issue Type: Bug
> Components: io-java-hadoop
> Affects Versions: 2.3.0, 2.4.0
> Reporter: Łukasz Gajowy
> Assignee: Alexey Romanenko
> Priority: Minor
> Fix For: 2.5.0
>
> Attachments: result_sorted1000000, result_sorted600000
>
> Time Spent: 2h
> Remaining Estimate: 0h
>
> For big datasets HadoopInputFormat sometimes skips/duplicates elements from
> database in resulting PCollection. This gives incorrect read result.
> Occurred to me while developing HadoopInputFormatIOIT and running it on
> dataflow. For datasets smaller or equal to 600 000 database rows I wasn't
> able to reproduce the issue. Bug appeared only for bigger sets, eg. 700 000,
> 1 000 000.
> Attachments:
> - text file with sorted HadoopInputFormat.read() result saved using
> TextIO.write().to().withoutSharding(). If you look carefully you'll notice
> duplicates or missing values that should not happen
> - same text file for 600 000 records not having any duplicates and missing
> elements
> - link to a PR with HadoopInputFormatIO integration test that allows to
> reproduce this issue. At the moment of writing, this code is not merged yet.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)