This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new da914e272 Define basics for collecting Iceberg metadata for the 
current snapshot (#3559)
da914e272 is described below

commit da914e272c935c519af9d60c1b35a1ab59f1165e
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Sep 13 11:40:07 2022 -0700

    Define basics for collecting Iceberg metadata for the current snapshot 
(#3559)
    
    * [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin 
portal (#3554)
    
    * remove jcentral
    * Use gradle plugin portal for shadow
    * Use maven central in all other cases
    
    * [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block 
deployment (#3551)
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if 
session_id is null when adding a flow
    
    * Add handling for fetchSession throwing an exception
    
    * Add logging when fails on constructor and initialization, but continue to 
local deploy
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in 
constructor
    
    * Fixed vars
    
    * Revert changes on azkabanSpecProducer
    
    * clean up error throwing
    
    * revert function checking changes
    
    * Reformat file
    
    * Clean up function
    
    * Format file for try/catch
    
    * Allow first time failure to authenticate with Azkaban to fail silently
    
    * Fix findbugs report
    
    * Refactor azkaban authentication into function. Call on init and if 
session_id is null when adding a flow
    
    * Fixed rebase
    
    * Fixed rebase
    
    * Revert changes for azkabanSpecProducer, but quiet log instead of throw in 
constructor
    
    * Add whitespace back
    
    * fix helix job wait completion bug when job goes to STOPPING state (#3556)
    
    address comments
    
    update stoppingStateEndTime with currentTime
    
    update test cases
    
    * [GOBBLIN-1699] Log progress of reducer task for visibility with slow 
compaction jobs #3552
    
    * before starting reduce
    * after first record is reduced
    * after reducing every 1000 records
    
    Co-authored-by: Urmi Mustafi <[email protected]>
    
    * Define basics for collecting Iceberg metadata for the current snapshot
    
    * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between 
task runner / application master for Dynamic work unit allocation (#3539)
    
    * [GOBBLIN-1673] Schema for dynamic work unit message
    
    * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
    
    * Address review comments
    
    * Correct import order
    
    Co-authored-by: Matthew Ho <[email protected]>
    Co-authored-by: Andy Jiang <[email protected]>
    Co-authored-by: Hanghang Nate Liu <[email protected]>
    Co-authored-by: umustafi <[email protected]>
    Co-authored-by: Urmi Mustafi <[email protected]>
    Co-authored-by: William Lo <[email protected]>
---
 .../management/copy/iceberg/IcebergCatalog.java    | 26 +++++++
 .../copy/iceberg/IcebergCatalogFactory.java        | 31 ++++++++
 .../copy/iceberg/IcebergHiveCatalog.java           | 40 ++++++++++
 .../copy/iceberg/IcebergSnapshotInfo.java          | 61 +++++++++++++++
 .../data/management/copy/iceberg/IcebergTable.java | 86 ++++++++++++++++++++++
 5 files changed, 244 insertions(+)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
new file mode 100644
index 000000000..70423e6a8
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+
+/**
+ * Any catalog from which to access {@link IcebergTable}s.
+ */
+public interface IcebergCatalog {
+  IcebergTable openTable(String dbName, String tableName);
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
new file mode 100644
index 000000000..43dff9fc6
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hive.HiveCatalogs;
+
+
+/**
+ * Provides an {@link IcebergCatalog}.
+ */
+public class IcebergCatalogFactory {
+  public static IcebergCatalog create(Configuration configuration) {
+    return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
new file mode 100644
index 000000000..d8ffdb799
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveCatalog;
+
+
+/**
+ * Hive-Metastore-based {@link IcebergCatalog}.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergHiveCatalog implements IcebergCatalog {
+  // NOTE: specifically necessitates `HiveCatalog`, as 
`BaseMetastoreCatalog.newTableOps` is `protected`!
+  private final HiveCatalog hc;
+
+  @Override
+  public IcebergTable openTable(String dbName, String tableName) {
+    return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, 
tableName)));
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
new file mode 100644
index 000000000..c51c1a27d
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Information about the metadata file and data file paths of a single Iceberg 
Snapshot.
+ */
+@Data
+public class IcebergSnapshotInfo {
+
+  @Data
+  public static class ManifestFileInfo {
+    private final String manifestFilePath;
+    private final List<String> listedFilePaths;
+  }
+
+  private final Long snapshotId;
+  private final Instant timestamp;
+  private final String metadataPath;
+  private final String manifestListPath;
+  private final List<ManifestFileInfo> manifestFiles;
+
+  public List<String> getManifestFilePaths() {
+    return 
manifestFiles.stream().map(ManifestFileInfo::getManifestFilePath).collect(Collectors.toList());
+  }
+
+  public List<String> getAllDataFilePaths() {
+    return 
manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
+  }
+
+  public List<String> getAllPaths() {
+    List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
+    result.addAll(getManifestFilePaths());
+    result.addAll(getAllDataFilePaths());
+    return result;
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
new file mode 100644
index 000000000..f6ff42698
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+import static 
org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
+
+
+/**
+ * Exposes metadata information for a single Iceberg table.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergTable {
+  private final TableOperations tableOps;
+
+  public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
+    TableMetadata current = tableOps.current();
+    Snapshot snapshot = current.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.allManifests();
+    return new IcebergSnapshotInfo(
+        snapshot.snapshotId(),
+        Instant.ofEpochMilli(snapshot.timestampMillis()),
+        current.metadataFileLocation(),
+        snapshot.manifestListLocation(),
+        // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, 
tableOps.io()))` due to checked exception
+        calcAllManifestFileInfo(manifests, tableOps.io())
+      );
+  }
+
+  @VisibleForTesting
+  static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> 
manifests, FileIO io) throws IOException {
+    List<ManifestFileInfo> result = Lists.newArrayList();
+    for (ManifestFile manifest : manifests) {
+      result.add(calcManifestFileInfo(manifest, io));
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  static IcebergSnapshotInfo.ManifestFileInfo 
calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+    return new ManifestFileInfo(manifest.path(), 
discoverDataFilePaths(manifest, io));
+  }
+
+  @VisibleForTesting
+  static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) 
throws IOException {
+    CloseableIterable<String> manifestPathsIterable = 
ManifestFiles.readPaths(manifest, io);
+    try {
+      return Lists.newArrayList(manifestPathsIterable);
+    } finally {
+      manifestPathsIterable.close();
+    }
+  }
+}

Reply via email to