This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bbe6f5  [BEAM-7450] Support unbounded reads with HCatalogIO
     new 19804ac  Merge pull request #8718: [BEAM-7450] Support unbounded reads 
with HCatalogIO
9bbe6f5 is described below

commit 9bbe6f523aac427958a4e99e4d729a80b105e63d
Author: Ankit Jhalaria <ajhala...@godaddy.com>
AuthorDate: Wed May 29 12:33:01 2019 -0700

    [BEAM-7450] Support unbounded reads with HCatalogIO
---
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java    |  91 +++++++++++++++--
 .../apache/beam/sdk/io/hcatalog/HCatalogUtils.java |  87 ++++++++++++++++
 .../beam/sdk/io/hcatalog/PartitionPollerFn.java    |  56 +++++++++++
 .../beam/sdk/io/hcatalog/PartitionReaderFn.java    | 111 +++++++++++++++++++++
 .../beam/sdk/io/hcatalog/HCatalogIOTest.java       |  80 +++++++++++++++
 5 files changed, 414 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 73518f6..05b43c6 100644
--- 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -33,15 +32,17 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -58,6 +59,7 @@ import org.apache.hive.hcatalog.data.transfer.ReadEntity;
 import org.apache.hive.hcatalog.data.transfer.ReaderContext;
 import org.apache.hive.hcatalog.data.transfer.WriteEntity;
 import org.apache.hive.hcatalog.data.transfer.WriterContext;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +85,20 @@ import org.slf4j.LoggerFactory;
  *       .withFilter(filterString) //optional, may be specified if the table 
is partitioned
  * }</pre>
  *
+ * <p>HCatalog source supports reading of HCatRecord in an unbounded mode. 
When run in an unbounded
+ * mode, HCatalogIO will continuously poll for new partitions and read that 
data. If provided with a
+ * termination condition, it will stop reading data after the condition is met.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(HCatalogIO.read()
+ *       .withConfigProperties(configProperties)
+ *       .withDatabase("default") //optional, assumes default if none specified
+ *       .withTable("employee")
+ *       .withPollingInterval(Duration.millis(15000)) // poll for new 
partitions every 15 seconds
+ *       
.withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) 
//optional
+ * }</pre>
+ *
  * <h3>Writing using HCatalog</h3>
  *
  * <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed 
source, for eg. Hive.
@@ -120,7 +136,10 @@ public class HCatalogIO {
 
   /** Read data from Hive. */
   public static Read read() {
-    return new 
AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
+    return new AutoValue_HCatalogIO_Read.Builder()
+        .setDatabase(DEFAULT_DATABASE)
+        .setPartitionCols(new ArrayList<>())
+        .build();
   }
 
   private HCatalogIO() {}
@@ -129,6 +148,7 @@ public class HCatalogIO {
   @VisibleForTesting
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<HCatRecord>> {
+
     @Nullable
     abstract Map<String, String> getConfigProperties();
 
@@ -147,6 +167,15 @@ public class HCatalogIO {
     @Nullable
     abstract Integer getSplitId();
 
+    @Nullable
+    abstract Duration getPollingInterval();
+
+    @Nullable
+    abstract List<String> getPartitionCols();
+
+    @Nullable
+    abstract TerminationCondition<Read, ?> getTerminationCondition();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -163,6 +192,12 @@ public class HCatalogIO {
 
       abstract Builder setContext(ReaderContext context);
 
+      abstract Builder setPollingInterval(Duration pollingInterval);
+
+      abstract Builder setPartitionCols(List<String> partitionCols);
+
+      abstract Builder setTerminationCondition(TerminationCondition<Read, ?> 
terminationCondition);
+
       abstract Read build();
     }
 
@@ -186,6 +221,28 @@ public class HCatalogIO {
       return toBuilder().setFilter(filter).build();
     }
 
+    /**
+     * If specified, polling for new partitions will happen at this 
periodicity. The returned
+     * PCollection will be unbounded. However if a withTerminationCondition is 
set along with
+     * pollingInterval, polling will stop after the termination condition has 
been met.
+     */
+    public Read withPollingInterval(Duration pollingInterval) {
+      return toBuilder().setPollingInterval(pollingInterval).build();
+    }
+
+    /** Set the names of the columns that are partitions. */
+    public Read withPartitionCols(List<String> partitionCols) {
+      return toBuilder().setPartitionCols(partitionCols).build();
+    }
+
+    /**
+     * If specified, the poll function will stop polling after the termination 
condition has been
+     * satisfied.
+     */
+    public Read withTerminationCondition(TerminationCondition<Read, ?> 
terminationCondition) {
+      return toBuilder().setTerminationCondition(terminationCondition).build();
+    }
+
     Read withSplitId(int splitId) {
       checkArgument(splitId >= 0, "Invalid split id-" + splitId);
       return toBuilder().setSplitId(splitId).build();
@@ -196,11 +253,27 @@ public class HCatalogIO {
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public PCollection<HCatRecord> expand(PBegin input) {
       checkArgument(getTable() != null, "withTable() is required");
       checkArgument(getConfigProperties() != null, "withConfigProperties() is 
required");
-
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
BoundedHCatalogSource(this)));
+      Watch.Growth<Read, Integer, Integer> growthFn;
+      if (getPollingInterval() != null) {
+        growthFn = Watch.growthOf(new 
PartitionPollerFn()).withPollInterval(getPollingInterval());
+        if (getTerminationCondition() != null) {
+          growthFn = 
growthFn.withTerminationPerInput(getTerminationCondition());
+        }
+        return input
+            .apply("ConvertToReadRequest", Create.of(this))
+            .apply("WatchForNewPartitions", growthFn)
+            .apply("PartitionReader", ParDo.of(new 
PartitionReaderFn(getConfigProperties())));
+      } else {
+        // Treat as Bounded
+        checkArgument(
+            getTerminationCondition() == null,
+            "withTerminationCondition() is not required when using in bounded 
reads mode");
+        return input.apply(org.apache.beam.sdk.io.Read.from(new 
BoundedHCatalogSource(this)));
+      }
     }
 
     @Override
@@ -244,14 +317,10 @@ public class HCatalogIO {
      */
     @Override
     public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws 
Exception {
-      Configuration conf = new Configuration();
-      for (Entry<String, String> entry : 
spec.getConfigProperties().entrySet()) {
-        conf.set(entry.getKey(), entry.getValue());
-      }
       IMetaStoreClient client = null;
       try {
-        HiveConf hiveConf = HCatUtil.getHiveConf(conf);
-        client = HCatUtil.getHiveMetastoreClient(hiveConf);
+        HiveConf hiveConf = HCatalogUtils.createHiveConf(spec);
+        client = HCatalogUtils.createMetaStoreClient(hiveConf);
         Table table = HCatUtil.getTable(client, spec.getDatabase(), 
spec.getTable());
         return StatsUtils.getFileSizeForTable(hiveConf, table);
       } finally {
diff --git 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java
 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java
new file mode 100644
index 0000000..bf3638e
--- /dev/null
+++ 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+
+/** Utility classes to enable meta store conf/client creation. */
+public class HCatalogUtils {
+
+  private static final int DESIRED_BUNDLE_SIZE_BYTES = 134217728; // 128 MB
+
+  static IMetaStoreClient createMetaStoreClient(Configuration conf)
+      throws IOException, MetaException {
+    final HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+    return HCatUtil.getHiveMetastoreClient(hiveConf);
+  }
+
+  static HiveConf createHiveConf(Read readRequest) throws IOException {
+    Configuration conf = 
createConfiguration(readRequest.getConfigProperties());
+    return HCatUtil.getHiveConf(conf);
+  }
+
+  static int getSplitCount(Read readRequest, Partition partitionToRead) throws 
Exception {
+    int desiredSplitCount = 1;
+    long estimatedSizeBytes = getFileSizeForPartition(readRequest, 
partitionToRead);
+    if (estimatedSizeBytes > 0) {
+      desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / 
DESIRED_BUNDLE_SIZE_BYTES);
+    }
+    return desiredSplitCount;
+  }
+
+  static Configuration createConfiguration(Map<String, String> 
configProperties) {
+    Configuration conf = new Configuration();
+    for (Map.Entry<String, String> entry : configProperties.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+    return conf;
+  }
+
+  private static long getFileSizeForPartition(Read readRequest, Partition 
partitionToRead)
+      throws Exception {
+    IMetaStoreClient client = null;
+    try {
+      HiveConf hiveConf = HCatalogUtils.createHiveConf(readRequest);
+      client = HCatalogUtils.createMetaStoreClient(hiveConf);
+      List<org.apache.hadoop.hive.ql.metadata.Partition> p = new ArrayList<>();
+      Table table = HCatUtil.getTable(client, readRequest.getDatabase(), 
readRequest.getTable());
+      final org.apache.hadoop.hive.ql.metadata.Partition partition =
+          new org.apache.hadoop.hive.ql.metadata.Partition(table, 
partitionToRead);
+      p.add(partition);
+      final List<Long> fileSizeForPartitions = 
StatsUtils.getFileSizeForPartitions(hiveConf, p);
+      return fileSizeForPartitions.get(0);
+    } finally {
+      // IMetaStoreClient is not AutoCloseable, closing it manually
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java
 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java
new file mode 100644
index 0000000..2e40710
--- /dev/null
+++ 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.joda.time.Instant;
+
+/** Return the list of current partitions present. */
+class PartitionPollerFn extends PollFn<Read, Integer> {
+  private transient IMetaStoreClient metaStoreClient;
+
+  @Override
+  public PollResult<Integer> apply(Read element, Context c) throws Exception {
+    final Configuration conf = 
HCatalogUtils.createConfiguration(element.getConfigProperties());
+    metaStoreClient = HCatalogUtils.createMetaStoreClient(conf);
+    final Instant now = Instant.now();
+    final PollResult<Integer> pollResult =
+        PollResult.incomplete(now, 
getPartitionIndices(element)).withWatermark(now);
+    if (metaStoreClient != null) {
+      metaStoreClient.close();
+    }
+    return pollResult;
+  }
+
+  private List<Integer> getPartitionIndices(Read read) throws Exception {
+    return IntStream.range(
+            0,
+            metaStoreClient
+                .listPartitions(read.getDatabase(), read.getTable(), 
Short.MAX_VALUE)
+                .size())
+        .boxed()
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java
 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java
new file mode 100644
index 0000000..70d0529
--- /dev/null
+++ 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+
+/** Reads partition at a given index. */
+class PartitionReaderFn extends DoFn<KV<Read, Integer>, HCatRecord> {
+  private transient IMetaStoreClient metaStoreClient;
+  private Map<String, String> configProperties;
+
+  public PartitionReaderFn(Map<String, String> configProperties) {
+    this.configProperties = configProperties;
+  }
+
+  private ReaderContext getReaderContext(Read readRequest, Integer 
partitionIndexToRead)
+      throws Exception {
+    final List<Partition> partitions =
+        metaStoreClient.listPartitions(
+            readRequest.getDatabase(), readRequest.getTable(), 
Short.MAX_VALUE);
+    final Partition partition = partitions.get(partitionIndexToRead);
+    checkArgument(
+        partition != null, "Unable to find a partition to read at index " + 
partitionIndexToRead);
+
+    final int desiredSplitCount = HCatalogUtils.getSplitCount(readRequest, 
partition);
+    final List<String> values = partition.getValues();
+    final List<String> partitionCols = readRequest.getPartitionCols();
+    checkArgument(
+        values.size() == partitionCols.size(),
+        "Number of input partitions should be equal to the values of list 
partition values.");
+
+    List<String> filter = new ArrayList<>();
+    for (int i = 0; i < partitionCols.size(); i++) {
+      filter.add(partitionCols.get(i) + "=" + "'" + values.get(i) + "'");
+    }
+    final String filterString = String.join(" and ", filter);
+
+    ReadEntity entity =
+        new ReadEntity.Builder()
+            .withDatabase(readRequest.getDatabase())
+            .withFilter(filterString)
+            .withTable(readRequest.getTable())
+            .build();
+    // pass the 'desired' split count as an hint to the API
+    Map<String, String> configProps = new 
HashMap<>(readRequest.getConfigProperties());
+    configProps.put(
+        HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 
String.valueOf(desiredSplitCount));
+    return DataTransferFactory.getHCatReader(entity, 
configProps).prepareRead();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    final Read readRequest = c.element().getKey();
+    final Integer partitionIndexToRead = c.element().getValue();
+    ReaderContext readerContext = getReaderContext(readRequest, 
partitionIndexToRead);
+    for (int i = 0; i < readerContext.numSplits(); i++) {
+      HCatReader reader = DataTransferFactory.getHCatReader(readerContext, i);
+      Iterator<HCatRecord> hcatIterator = reader.read();
+      while (hcatIterator.hasNext()) {
+        final HCatRecord record = hcatIterator.next();
+        c.output(record);
+      }
+    }
+  }
+
+  @Setup
+  public void setup() throws Exception {
+    final Configuration conf = 
HCatalogUtils.createConfiguration(configProperties);
+    metaStoreClient = HCatalogUtils.createMetaStoreClient(conf);
+  }
+
+  @Teardown
+  public void teardown() {
+    if (metaStoreClient != null) {
+      metaStoreClient.close();
+    }
+  }
+}
diff --git 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
index da631a3..7f925ef 100644
--- 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -40,8 +40,12 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.io.hcatalog.HCatalogIO.BoundedHCatalogSource;
 import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -52,11 +56,16 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -96,6 +105,9 @@ public class HCatalogIOTest implements Serializable {
                 prepareTestData();
               } else if (description.getAnnotation(NeedsEmptyTestTables.class) 
!= null) {
                 reCreateTestTable();
+              } else if 
(description.getAnnotation(NeedsEmptyTestTablesForUnboundedReads.class)
+                  != null) {
+                reCreateTestTableForUnboundedReads();
               }
               base.evaluate();
             }
@@ -110,6 +122,11 @@ public class HCatalogIOTest implements Serializable {
   @Target({ElementType.METHOD})
   private @interface NeedsTestData {}
 
+  /** Use this annotation to setup complete test data(table populated with 
unbounded records). */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  private @interface NeedsEmptyTestTablesForUnboundedReads {}
+
   /** Use this annotation to setup test tables alone(empty tables, no records 
are populated). */
   @Retention(RetentionPolicy.RUNTIME)
   @Target({ElementType.METHOD})
@@ -163,6 +180,56 @@ public class HCatalogIOTest implements Serializable {
     readAfterWritePipeline.run();
   }
 
+  private Map<String, String> getPartitions() {
+    Map<String, String> partitions = new HashMap<>();
+    partitions.put("load_date", "2019-05-14T23:28:04.425Z");
+    partitions.put("product_type", "1");
+    return partitions;
+  }
+
+  /** Perform end-to-end test of Write-then-Read operation. */
+  @Test
+  @NeedsEmptyTestTablesForUnboundedReads
+  public void testWriteThenUnboundedReadSuccess() throws Exception {
+
+    defaultPipeline
+        .apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT)))
+        .apply(
+            HCatalogIO.write()
+                
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                .withDatabase(TEST_DATABASE)
+                .withTable(TEST_TABLE)
+                .withPartition(getPartitions())
+                .withBatchSize(512L));
+    defaultPipeline.run();
+    final ImmutableList<String> partitions = ImmutableList.of("load_date", 
"product_type");
+    final PCollection<HCatRecord> data =
+        readAfterWritePipeline
+            .apply(
+                "ReadData",
+                HCatalogIO.read()
+                    
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
+                    .withDatabase(TEST_DATABASE)
+                    .withPartitionCols(partitions)
+                    .withTable(TEST_TABLE)
+                    .withPollingInterval(Duration.millis(15000))
+                    
.withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000))))
+            .setCoder((Coder) WritableCoder.of(DefaultHCatRecord.class));
+
+    final PCollection<String> output =
+        data.apply(
+            ParDo.of(
+                new DoFn<HCatRecord, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element().get(0).toString());
+                  }
+                }));
+
+    
PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
+    readAfterWritePipeline.run();
+  }
+
   /** Test of Write to a non-existent table. */
   @Test
   public void testWriteFailureTableDoesNotExist() {
@@ -276,6 +343,19 @@ public class HCatalogIOTest implements Serializable {
     service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, 
mycol2 int)");
   }
 
+  private void reCreateTestTableForUnboundedReads() throws 
CommandNeedRetryException {
+    service.executeQuery("drop table " + TEST_TABLE);
+    service.executeQuery(
+        "create table "
+            + TEST_TABLE
+            + "(mycol1 string, mycol2 int)  "
+            + "partitioned by (load_date string, product_type string)");
+    service.executeQuery(
+        "ALTER TABLE "
+            + TEST_TABLE
+            + " ADD PARTITION (load_date='2019-05-14T23:28:04.425Z', 
product_type='1')");
+  }
+
   private void prepareTestData() throws Exception {
     reCreateTestTable();
     insertTestData(getConfigPropertiesAsMap(service.getHiveConf()));

Reply via email to