[ 
https://issues.apache.org/jira/browse/BEAM-3484?focusedWorklogId=92819&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92819
 ]

ASF GitHub Bot logged work on BEAM-3484:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/18 20:04
            Start Date: 19/Apr/18 20:04
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5166: [BEAM-3484] Fix 
split issue in HadoopInputFormatIOIT
URL: https://github.com/apache/beam/pull/5166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
 
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index b22d57caa67..0ffd402320d 100644
--- 
a/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ 
b/sdks/java/io/hadoop-input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,6 +164,21 @@
  *              .withValueTranslation(myOutputValueType);
  * }
  * </pre>
+ *
+ * <p>IMPORTANT! In case of using {@code DBInputFormat} to read data from 
RDBMS, Beam parallelizes
+ * the process by using LIMIT and OFFSET clauses of SQL query to fetch 
different ranges of records
+ * (as a split) by different workers. To guarantee the same order and proper 
split of results you
+ * need to order them by one or more keys (either PRIMARY or UNIQUE). It can 
be done during
+ * configuration step, for example:
+ *
+ * <pre>
+ * {@code
+ * Configuration conf = new Configuration();
+ * conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ * conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+ * conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
+ * }
+ * </pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class HadoopInputFormatIO {
@@ -283,7 +299,9 @@
 
     /**
      * Validates that the mandatory configuration properties such as 
InputFormat class, InputFormat
-     * key and value classes are provided in the Hadoop configuration.
+     * key and value classes are provided in the Hadoop configuration. In case 
of using {@code
+     * DBInputFormat} you need to order results by one or more keys. It can be 
done by setting
+     * configuration option "mapreduce.jdbc.input.orderby".
      */
     private void validateConfiguration(Configuration configuration) {
       checkArgument(configuration != null, "configuration can not be null");
@@ -294,6 +312,13 @@ private void validateConfiguration(Configuration 
configuration) {
           configuration.get("key.class") != null, "configuration must contain 
\"key.class\"");
       checkArgument(
           configuration.get("value.class") != null, "configuration must 
contain \"value.class\"");
+      if 
(configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat"))
 {
+        checkArgument(
+            configuration.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY) != null,
+            "Configuration must contain \""
+                + DBConfiguration.INPUT_ORDER_BY_PROPERTY
+                + "\" when using DBInputFormat");
+      }
     }
 
     /**
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
index 58f3b0dafa0..e24dd68dd2c 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -110,6 +110,7 @@ private static void 
setupHadoopConfiguration(IOTestPipelineOptions options) {
     );
     conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
     conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+    conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
     conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, 
TestRowDBWritable.class, DBWritable.class);
 
     conf.setClass("key.class", LongWritable.class, Object.class);
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 4238e9b6505..c472442b09c 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -49,6 +49,7 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -805,6 +806,20 @@ public void 
testImmutablityOfOutputOfReadIfRecordReaderObjectsAreImmutable() thr
     assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray()));
   }
 
+  @Test
+  public void testValidateConfigurationWithDBInputFormat() {
+    Configuration conf = new Configuration();
+    conf.setClass("key.class", LongWritable.class, Object.class);
+    conf.setClass("value.class", Text.class, Object.class);
+    conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, 
InputFormat.class);
+
+    thrown.expect(IllegalArgumentException.class);
+    HadoopInputFormatIO.<String, String>read()
+        .withConfiguration(new SerializableConfiguration(conf).get())
+        .withKeyTranslation(myKeyTranslate)
+        .withValueTranslation(myValueTranslate);
+  }
+
   private static SerializableConfiguration loadTestConfiguration(Class<?> 
inputFormatClassName,
       Class<?> keyClass, Class<?> valueClass) {
     Configuration conf = new Configuration();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92819)
    Time Spent: 2h  (was: 1h 50m)

> HadoopInputFormatIO reads big datasets invalid
> ----------------------------------------------
>
>                 Key: BEAM-3484
>                 URL: https://issues.apache.org/jira/browse/BEAM-3484
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-hadoop
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Łukasz Gajowy
>            Assignee: Alexey Romanenko
>            Priority: Minor
>             Fix For: 2.5.0
>
>         Attachments: result_sorted1000000, result_sorted600000
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> For big datasets HadoopInputFormat sometimes skips/duplicates elements from 
> database in resulting PCollection. This gives incorrect read result.
> Occurred to me while developing HadoopInputFormatIOIT and running it on 
> dataflow. For datasets smaller or equal to 600 000 database rows I wasn't 
> able to reproduce the issue. Bug appeared only for bigger sets, eg. 700 000, 
> 1 000 000. 
> Attachments:
>   - text file with sorted HadoopInputFormat.read() result saved using 
> TextIO.write().to().withoutSharding(). If you look carefully you'll notice 
> duplicates or missing values that should not happen
>  - same text file for 600 000 records not having any duplicates and missing 
> elements
>  - link to a PR with HadoopInputFormatIO integration test that allows to 
> reproduce this issue. At the moment of writing, this code is not merged yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to