Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 7df0878e0 -> 56ead8845


[GOBBLIN-348] Added Hdfs Modified Time based Hive Version Finder

Closes #2211 from aditya1105/HiveSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56ead884
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56ead884
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56ead884

Branch: refs/heads/master
Commit: 56ead8845f420248b76329f1736ecd593e693536
Parents: 7df0878
Author: adsharma <[email protected]>
Authored: Sat Dec 16 12:23:09 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Sat Dec 16 12:23:09 2017 -0800

----------------------------------------------------------------------
 .../HdfsModifiedTimeHiveVersionFinder.java      | 67 +++++++++++++++
 .../HdfsModifiedTimeHiveVersionFinderTest.java  | 87 ++++++++++++++++++++
 2 files changed, 154 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56ead884/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
new file mode 100644
index 0000000..8a7cb87
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import java.io.IOException;
+
+import 
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.joda.time.DateTime;
+
+import com.typesafe.config.Config;
+
+import parquet.Preconditions;
+
+
+/**
+ * A Hive Partition finder where the the version is the partition value.
+ */
+public class HdfsModifiedTimeHiveVersionFinder extends 
AbstractHiveDatasetVersionFinder {
+  private final FileSystem fs;
+
+  public HdfsModifiedTimeHiveVersionFinder(FileSystem fs, Config config) {
+    this.fs = fs;
+  }
+
+  /**
+   * Create a {@link TimestampedHiveDatasetVersion} from a {@link Partition} 
based on the Modified time of underlying
+   * hdfs data location
+   * @throws IllegalArgumentException when argument is null
+   * @throws IllegalArgumentException when data location of partition is null
+   * @throws IllegalArgumentException when data location of partition doesn't 
exist
+   * {@inheritDoc}
+   */
+  @Override
+  protected TimestampedHiveDatasetVersion getDatasetVersion(Partition 
partition) {
+    try {
+      Preconditions.checkArgument(partition != null, "Argument to method ");
+
+      Path dataLocation = partition.getDataLocation();
+      Preconditions
+          .checkArgument(dataLocation != null, "Data location is null for 
partition " + partition.getCompleteName());
+      boolean exists = this.fs.exists(dataLocation);
+      Preconditions.checkArgument(exists, "Data location doesn't exist for 
partition " + partition.getCompleteName());
+
+      long modificationTS = 
this.fs.getFileStatus(dataLocation).getModificationTime();
+      return new TimestampedHiveDatasetVersion(new DateTime(modificationTS), 
partition);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56ead884/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java
new file mode 100644
index 0000000..46d2745
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import java.io.IOException;
+
+import 
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+
+
+@Test(groups = {"gobblin.data.management.version"})
+public class HdfsModifiedTimeHiveVersionFinderTest {
+  public static final String PARTITION_NAME = 
"RandomDb@RandomTable@RandomPartition";
+  public static final String TIMESTAMP = "123456";
+  private FileSystem fs = Mockito.mock(FileSystem.class);
+  private Partition partition = Mockito.mock(Partition.class);
+  private FileStatus fileStatus = Mockito.mock(FileStatus.class);
+  private HdfsModifiedTimeHiveVersionFinder hdfsModifiedTimeHiveVersionFinder =
+      new HdfsModifiedTimeHiveVersionFinder(this.fs, ConfigFactory.empty());
+
+  @BeforeMethod
+  public void initialize() {
+    Mockito.doReturn(PARTITION_NAME).when(this.partition).getCompleteName();
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testNullPartition() {
+    Partition partition = null;
+    TimestampedHiveDatasetVersion datasetVersion = 
this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(partition);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testNullDataLocation() {
+    Mockito.doReturn(null).when(this.partition).getDataLocation();
+    TimestampedHiveDatasetVersion datasetVersion =
+        
this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testInvalidDataLocation()
+      throws IOException {
+    Mockito.doReturn(new Path("Invalid 
Location")).when(this.partition).getDataLocation();
+    Mockito.doReturn(false).when(this.fs).exists(Mockito.any(Path.class));
+    TimestampedHiveDatasetVersion datasetVersion =
+        
this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition);
+  }
+
+  @Test
+  public void testTimeStampForVersion()
+      throws IOException {
+    Mockito.doReturn(new Path("Invalid 
Location")).when(this.partition).getDataLocation();
+    Mockito.doReturn(true).when(this.fs).exists(Mockito.any(Path.class));
+    
Mockito.doReturn(this.fileStatus).when(this.fs).getFileStatus(Mockito.any(Path.class));
+    
Mockito.doReturn(Long.valueOf(TIMESTAMP)).when(this.fileStatus).getModificationTime();
+    TimestampedHiveDatasetVersion datasetVersion =
+        
this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition);
+
+    // Check if the datasetVersion contains the correct partition
+    
Assert.assertTrue(datasetVersion.getPartition().getCompleteName().equalsIgnoreCase(PARTITION_NAME));
+    // Check if the datasetVersion contains the correct modified timestamp of 
the underlying data location
+    Assert.assertTrue(datasetVersion.getDateTime().getMillis() == 
Long.valueOf(TIMESTAMP));
+    System.out.println(datasetVersion);
+  }
+}

Reply via email to