Repository: incubator-gobblin Updated Branches: refs/heads/master 7df0878e0 -> 56ead8845
[GOBBLIN-348] Added Hdfs Modified Time based Hive Version Finder Closes #2211 from aditya1105/HiveSource Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56ead884 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56ead884 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56ead884 Branch: refs/heads/master Commit: 56ead8845f420248b76329f1736ecd593e693536 Parents: 7df0878 Author: adsharma <[email protected]> Authored: Sat Dec 16 12:23:09 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Sat Dec 16 12:23:09 2017 -0800 ---------------------------------------------------------------------- .../HdfsModifiedTimeHiveVersionFinder.java | 67 +++++++++++++++ .../HdfsModifiedTimeHiveVersionFinderTest.java | 87 ++++++++++++++++++++ 2 files changed, 154 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56ead884/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java new file mode 100644 index 0000000..8a7cb87 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.data.management.version.finder; + +import java.io.IOException; + +import org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.joda.time.DateTime; + +import com.typesafe.config.Config; + +import parquet.Preconditions; + + +/** + * A Hive Partition finder where the the version is the partition value. + */ +public class HdfsModifiedTimeHiveVersionFinder extends AbstractHiveDatasetVersionFinder { + private final FileSystem fs; + + public HdfsModifiedTimeHiveVersionFinder(FileSystem fs, Config config) { + this.fs = fs; + } + + /** + * Create a {@link TimestampedHiveDatasetVersion} from a {@link Partition} based on the Modified time of underlying + * hdfs data location + * @throws IllegalArgumentException when argument is null + * @throws IllegalArgumentException when data location of partition is null + * @throws IllegalArgumentException when data location of partition doesn't exist + * {@inheritDoc} + */ + @Override + protected TimestampedHiveDatasetVersion getDatasetVersion(Partition partition) { + try { + Preconditions.checkArgument(partition != null, "Argument to method "); + + Path dataLocation = partition.getDataLocation(); + Preconditions + .checkArgument(dataLocation != null, "Data location is null for partition " + partition.getCompleteName()); + boolean exists = this.fs.exists(dataLocation); + Preconditions.checkArgument(exists, "Data location doesn't exist for partition " + partition.getCompleteName()); + + long modificationTS = this.fs.getFileStatus(dataLocation).getModificationTime(); + return new TimestampedHiveDatasetVersion(new DateTime(modificationTS), partition); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56ead884/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java new file mode 100644 index 0000000..46d2745 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinderTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.data.management.version.finder; + +import java.io.IOException; + +import org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.ConfigFactory; + + +@Test(groups = {"gobblin.data.management.version"}) +public class HdfsModifiedTimeHiveVersionFinderTest { + public static final String PARTITION_NAME = "RandomDb@RandomTable@RandomPartition"; + public static final String TIMESTAMP = "123456"; + private FileSystem fs = Mockito.mock(FileSystem.class); + private Partition partition = Mockito.mock(Partition.class); + private FileStatus fileStatus = Mockito.mock(FileStatus.class); + private HdfsModifiedTimeHiveVersionFinder hdfsModifiedTimeHiveVersionFinder = + new HdfsModifiedTimeHiveVersionFinder(this.fs, ConfigFactory.empty()); + + @BeforeMethod + public void initialize() { + Mockito.doReturn(PARTITION_NAME).when(this.partition).getCompleteName(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNullPartition() { + Partition partition = null; + TimestampedHiveDatasetVersion datasetVersion = this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(partition); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNullDataLocation() { + Mockito.doReturn(null).when(this.partition).getDataLocation(); + TimestampedHiveDatasetVersion datasetVersion = + this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInvalidDataLocation() + throws IOException { + Mockito.doReturn(new Path("Invalid Location")).when(this.partition).getDataLocation(); + Mockito.doReturn(false).when(this.fs).exists(Mockito.any(Path.class)); + TimestampedHiveDatasetVersion datasetVersion = + this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition); + } + + @Test + public void testTimeStampForVersion() + throws IOException { + Mockito.doReturn(new Path("Invalid Location")).when(this.partition).getDataLocation(); + Mockito.doReturn(true).when(this.fs).exists(Mockito.any(Path.class)); + Mockito.doReturn(this.fileStatus).when(this.fs).getFileStatus(Mockito.any(Path.class)); + Mockito.doReturn(Long.valueOf(TIMESTAMP)).when(this.fileStatus).getModificationTime(); + TimestampedHiveDatasetVersion datasetVersion = + this.hdfsModifiedTimeHiveVersionFinder.getDatasetVersion(this.partition); + + // Check if the datasetVersion contains the correct partition + Assert.assertTrue(datasetVersion.getPartition().getCompleteName().equalsIgnoreCase(PARTITION_NAME)); + // Check if the datasetVersion contains the correct modified timestamp of the underlying data location + Assert.assertTrue(datasetVersion.getDateTime().getMillis() == Long.valueOf(TIMESTAMP)); + System.out.println(datasetVersion); + } +}
