This is an automated email from the ASF dual-hosted git repository.

ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit fc3d921f0783cc2044ac4ed1f1a144231b5adbad
Author: Arina Ielchiieva <[email protected]>
AuthorDate: Thu Jul 25 19:22:47 2019 +0300

    DRILL-7331: Drill Iceberg Metastore metadata expiration
    
    closes #1831
---
 .../apache/drill/exec/server/DrillbitContext.java  |   1 +
 metastore/iceberg-metastore/README.md              |  25 +-
 metastore/iceberg-metastore/pom.xml                |   2 +-
 .../drill/metastore/iceberg/IcebergMetastore.java  |  29 ++-
 .../drill/metastore/iceberg/MetastoreContext.java  |   8 +
 .../iceberg/components/tables/IcebergTables.java   |  10 +-
 .../iceberg/config/IcebergConfigConstants.java     |   5 +
 .../iceberg/operate/ExpirationHandler.java         | 276 +++++++++++++++++++++
 .../metastore/iceberg/operate/IcebergModify.java   |   3 +
 .../src/main/resources/drill-metastore-module.conf |  74 +++---
 .../iceberg/components/tables/TestTableKey.java    |   2 +-
 .../tables/TestTablesOperationTransformer.java     |   5 +-
 .../iceberg/operate/TestExpirationHandler.java     | 143 +++++++++++
 .../java/org/apache/drill/metastore/Metastore.java |   2 +-
 .../apache/drill/metastore/MetastoreRegistry.java  |   9 +-
 .../main/resources/drill-metastore-default.conf    |   2 +-
 16 files changed, 536 insertions(+), 60 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 397e7bc..b3cd6f0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -300,6 +300,7 @@ public class DrillbitContext implements AutoCloseable {
     getFunctionImplementationRegistry().close();
     getRemoteFunctionRegistry().close();
     getCompiler().close();
+    metastoreRegistry.close();
   }
 
   public ResourceManager getResourceManager() {
diff --git a/metastore/iceberg-metastore/README.md 
b/metastore/iceberg-metastore/README.md
index 7d805c1..cc46c2a 100644
--- a/metastore/iceberg-metastore/README.md
+++ b/metastore/iceberg-metastore/README.md
@@ -15,6 +15,10 @@ can be overwritten in `drill-metastore-distrib.conf` or  
`drill-metastore-overri
 `drill.metastore.iceberg.location.base_path` and 
`drill.metastore.iceberg.location.relative_path` -
 indicate Iceberg Metastore base location.
 
+`drill.metastore.iceberg.expiration.period` - allows to specify period after 
which Iceberg table outdated
+metadata will be expired. Unit names must correspond to 
`java.time.temporal.ChronoUnit` enum values
+that do not have estimated duration (millis, seconds, minutes, hours, days).
+
 `drill.metastore.iceberg.components` - provides configuration for specific 
Metastore components:
 `drill.metastore.iceberg.components.tables`, 
`drill.metastore.iceberg.components.views`.
 
@@ -162,7 +166,7 @@ To delete data from Iceberg table, caller provides filter 
by which data will be
 Filter expression must be based on component partition keys.
 
 Delete operation removes partitions from Iceberg table, it does not remove 
data files to which
-these partitions were pointing.
+these partitions were pointing. Outdated data files will be deleted during 
expiration process.
 
 If delete operation was successful, Iceberg table generates new snapshot and 
updates
 its own metadata.
@@ -170,9 +174,16 @@ its own metadata.
 ### Purge
 
 Allows to delete all data from Iceberg table. During this operation Iceberg 
table
-is not deleted, history of all operations and data files are preserved.
-
-## Data cleanup
-
-Iceberg table provides ability to remove outdated data files and snapshots 
-when they are no longer needed. Such support in Iceberg Metastore will be 
added later.
+is not deleted, history of all operations and data files are preserved until
+expiration process is launched.
+
+## Iceberg metadata expiration
+
+Iceberg table generates metadata for each modification operation:
+snapshot, manifest file, table metadata file. Also when performing delete 
operation,
+previously stored data files are not deleted. These files with the time
+can occupy lots of space. `ExpirationHandler` allows to expire outdated 
metadata and
+data files after configured time period 
(`drill.metastore.iceberg.expiration.period`).
+If expiration period is not indicated, zero or negative, expiration won't be 
performed.
+`ExpirationHandler` is called after each modification operation, it checks if 
expiration period
+has elapsed and submits expiration process in a separate thread.
diff --git a/metastore/iceberg-metastore/pom.xml 
b/metastore/iceberg-metastore/pom.xml
index 1caaa7c..f8544fd 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -33,7 +33,7 @@
   <name>metastore/Drill Iceberg Metastore</name>
 
   <properties>
-    <iceberg.version>cfd2737</iceberg.version>
+    <iceberg.version>77a456a</iceberg.version>
     <caffeine.version>2.7.0</caffeine.version>
   </properties>
 
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
index 6cd375c..e223c5a 100644
--- 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java
@@ -25,6 +25,7 @@ import org.apache.drill.metastore.components.views.Views;
 import org.apache.drill.metastore.iceberg.components.tables.IcebergTables;
 import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
 import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
+import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
 import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema;
 import org.apache.drill.shaded.guava.com.google.common.collect.MapDifference;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -58,6 +59,7 @@ public class IcebergMetastore implements Metastore {
   private final org.apache.iceberg.Tables tables;
   private final String baseLocation;
   private final Map<String, String> commonProperties;
+  private final ExpirationHandler expirationHandler;
 
   /**
    * Table properties for each Iceberg table should be updated only once,
@@ -71,10 +73,11 @@ public class IcebergMetastore implements Metastore {
 
   public IcebergMetastore(DrillConfig config) {
     this.config = config;
-    Configuration configuration = configuration(config);
+    Configuration configuration = configuration();
     this.tables = new HadoopTables(new Configuration(configuration));
-    this.baseLocation = baseLocation(config, new Configuration(configuration));
-    this.commonProperties = properties(config, 
IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES);
+    this.baseLocation = baseLocation(new Configuration(configuration));
+    this.commonProperties = 
properties(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES);
+    this.expirationHandler = new ExpirationHandler(config, new 
Configuration(configuration));
   }
 
   @Override
@@ -82,7 +85,7 @@ public class IcebergMetastore implements Metastore {
     Table table = loadTable(IcebergConfigConstants.COMPONENTS_TABLES_LOCATION,
       IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES,
       IcebergTables.SCHEMA, Tables.class);
-    return new IcebergTables(table);
+    return new IcebergTables(table, expirationHandler);
   }
 
   @Override
@@ -94,10 +97,9 @@ public class IcebergMetastore implements Metastore {
    * Initializes {@link Configuration} based on config properties.
    * if config properties are not indicated, returns default instance.
    *
-   * @param config Drill config
    * @return {@link Configuration} instance
    */
-  private Configuration configuration(DrillConfig config) {
+  private Configuration configuration() {
     Configuration configuration = new Configuration();
     if (config.hasPath(IcebergConfigConstants.CONFIG_PROPERTIES)) {
       Config configProperties = 
config.getConfig(IcebergConfigConstants.CONFIG_PROPERTIES);
@@ -113,13 +115,12 @@ public class IcebergMetastore implements Metastore {
    * If {@link IcebergConfigConstants#BASE_PATH} is not set, user home 
directory is used.
    * {@link IcebergConfigConstants#RELATIVE_PATH} must be set.
    *
-   * @param config Drill config
    * @param configuration Hadoop configuration
    * @return Iceberg table base location
    * @throws IcebergMetastoreException if unable to init file system
    *         or Iceberg Metastore relative path is not indicated
    */
-  private String baseLocation(DrillConfig config, Configuration configuration) 
{
+  private String baseLocation(Configuration configuration) {
     FileSystem fs;
     try {
       fs = FileSystem.get(configuration);
@@ -145,14 +146,13 @@ public class IcebergMetastore implements Metastore {
   }
 
   /**
-   * Collects properties name and values into map if they are present in the 
given config,
+   * Collects properties name and values into map if they are present in the 
config,
    * returns empty map otherwise.
    *
-   * @param config Drill config
    * @param propertiesPath path to properties in the config
    * @return map with properties names and their values
    */
-  private Map<String, String> properties(DrillConfig config, String 
propertiesPath) {
+  private Map<String, String> properties(String propertiesPath) {
     return config.hasPath(propertiesPath)
       ? config.getConfig(propertiesPath).entrySet().stream()
       .collect(Collectors.toMap(
@@ -230,7 +230,7 @@ public class IcebergMetastore implements Metastore {
    */
   private Map<String, String> tableProperties(String 
componentPropertiesConfig) {
     Map<String, String> properties = new HashMap<>(commonProperties);
-    properties.putAll(properties(config, componentPropertiesConfig));
+    properties.putAll(properties(componentPropertiesConfig));
     return properties;
   }
 
@@ -270,4 +270,9 @@ public class IcebergMetastore implements Metastore {
 
     updateProperties.commit();
   }
+
+  @Override
+  public void close() {
+    expirationHandler.close();
+  }
 }
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
index c4806d8..933910d 100644
--- 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.iceberg;
 
+import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
 import org.apache.drill.metastore.iceberg.transform.Transformer;
 import org.apache.drill.metastore.iceberg.write.FileWriter;
 import org.apache.iceberg.Table;
@@ -50,4 +51,11 @@ public interface MetastoreContext<T> {
    * @return transformer instance
    */
   Transformer<T> transformer();
+
+  /**
+   * Returns expiration handler that expires Iceberg table metadata.
+   *
+   * @return expiration handler instance
+   */
+  ExpirationHandler expirationHandler();
 }
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
index c28c0ec..f4d32e9 100644
--- 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
@@ -18,6 +18,7 @@
 package org.apache.drill.metastore.iceberg.components.tables;
 
 import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.iceberg.operate.ExpirationHandler;
 import org.apache.drill.metastore.operate.Metadata;
 import org.apache.drill.metastore.operate.Modify;
 import org.apache.drill.metastore.operate.Read;
@@ -55,9 +56,11 @@ public class IcebergTables implements Tables, 
MetastoreContext<TableMetadataUnit
   public static IcebergTableSchema SCHEMA = 
IcebergTableSchema.of(TableMetadataUnit.class, PARTITION_KEYS);
 
   private final Table table;
+  private final ExpirationHandler expirationHandler;
 
-  public IcebergTables(Table table) {
+  public IcebergTables(Table table, ExpirationHandler expirationHandler) {
     this.table = table;
+    this.expirationHandler = expirationHandler;
   }
 
   public MetastoreContext<TableMetadataUnit> context() {
@@ -93,4 +96,9 @@ public class IcebergTables implements Tables, 
MetastoreContext<TableMetadataUnit
   public Transformer<TableMetadataUnit> transformer() {
     return new TablesTransformer(context());
   }
+
+  @Override
+  public ExpirationHandler expirationHandler() {
+    return expirationHandler;
+  }
 }
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
index 6f326ac..4419f68 100644
--- 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java
@@ -52,6 +52,11 @@ public interface IcebergConfigConstants {
   String RELATIVE_PATH = LOCATION_NAMESPACE + "relative_path";
 
   /**
+   * Defines config which provides expiration period value.
+   */
+  String EXPIRATION_PERIOD = BASE + "expiration.period";
+
+  /**
    * Drill Iceberg Metastore components configuration properties namespace.
    */
   String COMPONENTS = BASE + "components.";
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
new file mode 100644
index 0000000..f06d496
--- /dev/null
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.iceberg.operate;
+
+import com.typesafe.config.ConfigValue;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
+import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Iceberg table generates metadata for each modification operation:
+ * snapshot, manifest file, table metadata file. Also when performing delete 
operation,
+ * previously stored data files are not deleted. These files with the time
+ * can occupy lots of space.
+ * <p/>
+ * Expiration handler expires outdated metadata and data files after 
configured expiration period.
+ * Expiration period is set in the Iceberg Metastore config {@link 
IcebergConfigConstants#EXPIRATION_PERIOD}.
+ * Units should correspond to {@link ChronoUnit} values that do not have 
estimated duration
+ * (millis, seconds, minutes, hours, days).
+ * If expiration period is not set, zero or negative, expiration process will 
not be executed.
+ * <p/>
+ * Expiration process is launched using executor service which allows to 
execute only one thread at a time,
+ * idle thread is not kept in the core pool since it is assumed that 
expiration process won't be launched to often.
+ * <p/>
+ * During Drillbit shutdown, if there are expiration tasks in the queue, they 
will be discarded in order to
+ * unblock Drillbit shutdown process.
+ */
+public class ExpirationHandler implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ExpirationHandler.class);
+
+  private static final Pattern METADATA_VERSION_PATTERN = 
Pattern.compile("^v([0-9]+)\\..*");
+
+  // contains Iceberg table location and its last expiration time
+  private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>();
+  private final Configuration configuration;
+  private final long expirationPeriod;
+  private volatile ExecutorService executorService;
+
+  public ExpirationHandler(DrillConfig config, Configuration configuration) {
+    this.configuration = configuration;
+    this.expirationPeriod = expirationPeriod(config);
+    logger.debug("Drill Iceberg Metastore expiration period: {}", 
expirationPeriod);
+  }
+
+  /**
+   * Checks if expiration process needs to be performed for the given Iceberg 
table
+   * by comparing stored last expiration time.
+   * If difference between last expiration time and current time is more or 
equal to
+   * expiration period, launches expiration process.
+   * If expiration period is zero or negative, no expiration process will be 
launched.
+   *
+   * @param table Iceberg table instance
+   * @return true if expiration process was launched, false otherwise
+   */
+  public boolean expire(Table table) {
+    if (expirationPeriod <= 0) {
+      return false;
+    }
+
+    long current = System.currentTimeMillis();
+    Long last = expirationStatus.putIfAbsent(table.location(), current);
+
+    if (last != null && current - last >= expirationPeriod) {
+      expirationStatus.put(table.location(), current);
+
+      ExecutorService executorService = executorService();
+      executorService.submit(() -> {
+        logger.debug("Expiring Iceberg table [{}] metadata", table.location());
+        table.expireSnapshots()
+          .expireOlderThan(current)
+          .commit();
+        // TODO: Replace with table metadata expiration through Iceberg API
+        //       when https://github.com/apache/incubator-iceberg/issues/181 
is resolved
+        //       table.expireTableMetadata().expireOlderThan(current).commit();
+        expireTableMetadata(table);
+      });
+      return true;
+    }
+    return false;
+  }
+
+  public long expirationPeriod() {
+    return expirationPeriod;
+  }
+
+  @Override
+  public void close() {
+    if (executorService != null) {
+      // unlike shutdown(), shutdownNow() discards all queued waiting tasks
+      // this is done in order to unblock Drillbit shutdown
+      executorService.shutdownNow();
+    }
+  }
+
+  private long expirationPeriod(DrillConfig config) {
+    if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) {
+      Duration duration = 
config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream()
+        .map(this::duration)
+        .reduce(Duration.ZERO, Duration::plus);
+      return duration.toMillis();
+    }
+    return 0;
+  }
+
+  private Duration duration(Map.Entry<String, ConfigValue> entry) {
+    String amountText = String.valueOf(entry.getValue().unwrapped());
+    String unitText = entry.getKey().toUpperCase();
+    try {
+      long amount = Long.parseLong(amountText);
+      ChronoUnit unit = ChronoUnit.valueOf(unitText);
+      return Duration.of(amount, unit);
+    } catch (NumberFormatException e) {
+      throw new IcebergMetastoreException(String.format("Error when parsing 
expiration period config. " +
+        "Unable to convert [%s] into long", amountText), e);
+    } catch (IllegalArgumentException e) {
+      throw new IcebergMetastoreException(String.format("Error when parsing 
expiration period config. " +
+        "Unable to convert [%s] into [%s]", unitText, 
ChronoUnit.class.getCanonicalName()), e);
+    }
+  }
+
+  /**
+   * Initializes executor service instance using DCL.
+   * Created thread executor instance allows to execute only one thread at a 
time
+   * but unlike single thread executor does not keep this thread in the pool.
+   * Custom thread factory is used to define Iceberg Metastore specific thread 
names.
+   *
+   * @return executor service instance
+   */
+  private ExecutorService executorService() {
+    if (executorService == null) {
+      synchronized (this) {
+        if (executorService == null) {
+          this.executorService = new ThreadPoolExecutor(0, 1, 0L,
+            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new 
IcebergThreadFactory());
+        }
+      }
+    }
+    return executorService;
+  }
+
+  /**
+   * Expires outdated Iceberg table metadata files.
+   * Reads current Iceberg table metadata version from version-hint.text file
+   * and deletes all metadata files that end with ".metadata.json" and have
+   * version less than current.
+   * <p/>
+   * Should be replaced with
+   * 
<code>table.expireTableMetadata().expireOlderThan(current).commit();</code>
+   * when <a 
href="https://github.com/apache/incubator-iceberg/issues/181";>Issue#181</a>
+   * is resolved.
+   *
+   * @param table Iceberg table instance
+   */
+  private void expireTableMetadata(Table table) {
+    try {
+      String location = table.location();
+      Path metadata = new Path(location, "metadata");
+      FileSystem fs = metadata.getFileSystem(configuration);
+      for (FileStatus fileStatus : listExpiredMetadataFiles(fs, metadata)) {
+        if (fs.delete(fileStatus.getPath(), false)) {
+          logger.debug("Deleted Iceberg table [{}] metadata file [{}]", 
table.location(), fileStatus.getPath());
+        }
+      }
+    } catch (NumberFormatException | IOException e) {
+      logger.warn("Unable to expire Iceberg table [{}] metadata files", 
table.location(), e);
+    }
+  }
+
+  /**
+   * Reads current Iceberg table metadata version from version-hint.text file
+   * and returns all metadata files that end with ".metadata.json" and have
+   * version less than current.
+   *
+   * @param fs file system
+   * @param metadata pth to Iceberg metadata
+   * @return metadata files with version less than current
+   * @throws IOException in case of error listing file statuses
+   */
+  private FileStatus[] listExpiredMetadataFiles(FileSystem fs, Path metadata) 
throws IOException {
+    int currentVersion = currentVersion(fs, metadata);
+    return fs.listStatus(metadata, path -> {
+      if (path.getName().endsWith(".metadata.json")) {
+        int version = parseVersion(path);
+        return version != -1 && currentVersion > version;
+      }
+      return false;
+    });
+  }
+
+  /**
+   * Reads current table metadata version from version-hint.text file.
+   *
+   * @param fs file system
+   * @param metadata table metadata path
+   * @return current table metadata version
+   * @throws IOException if unable to read current table metadata version
+   */
+  private int currentVersion(FileSystem fs, Path metadata) throws IOException {
+    Path versionHintFile = new Path(metadata, "version-hint.text");
+    try (BufferedReader in = new BufferedReader(new 
InputStreamReader(fs.open(versionHintFile)))) {
+      return Integer.parseInt(in.readLine().replace("\n", ""));
+    }
+  }
+
+  /**
+   * Extracts metadata version from table metadata file name.
+   * Example: v1.metadata.json -> 1, v15.metadata.json -> 15
+   *
+   * @param path table metadata file path
+   * @return metadata version
+   */
+  private int parseVersion(Path path) {
+    Matcher matcher = METADATA_VERSION_PATTERN.matcher(path.getName());
+    if (matcher.find() && matcher.groupCount() == 1) {
+      return Integer.parseInt(matcher.group(1));
+    }
+    throw new NumberFormatException("Unable to parse version for path " + 
path);
+  }
+
+  /**
+   * Wraps default thread factory and adds Iceberg Metastore prefix to the 
original thread name.
+   * Is used to uniquely identify Iceberg metastore threads.
+   * Example: drill-iceberg-metastore-pool-1-thread-1
+   */
+  private static class IcebergThreadFactory implements ThreadFactory {
+
+    private static final String THREAD_PREFIX = "drill-iceberg-metastore-";
+    private final ThreadFactory delegate = Executors.defaultThreadFactory();
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+      Thread thread = delegate.newThread(runnable);
+      thread.setName(THREAD_PREFIX + thread.getName());
+      return thread;
+    }
+  }
+}
diff --git 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
index a05c2dc..a510f86 100644
--- 
a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
+++ 
b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
@@ -79,5 +79,8 @@ public class IcebergModify<T> implements Modify<T> {
     Transaction transaction = context.table().newTransaction();
     operations.forEach(op -> op.add(transaction));
     transaction.commitTransaction();
+
+    // check if Iceberg table metadata needs to be expired after each 
modification operation
+    context.expirationHandler().expire(context.table());
   }
 }
diff --git 
a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf 
b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
index 182d029..be98a97 100644
--- a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
+++ b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf
@@ -16,45 +16,53 @@
 
 drill.metastore.iceberg: {
 
-    // File system config can be specified
-    config.properties: {
-      // Iceberg tables location will be created under default file system
-      // taken from Hadoop configuration unless the below property is set
-      // fs.defaultFS: "file:///"
-    }
+  // File system config can be specified
+  config.properties: {
+    // Iceberg tables location will be created under default file system
+    // taken from Hadoop configuration unless the below property is set
+    // fs.defaultFS: "file:///"
+  }
 
-    // Iceberg Metastore location is constructed based on
-    // combination of base_path and relative_path config values
-    location: {
-      // Iceberg table base path, if not indicated, user home directory will 
be used
-      // base_path: "/app",
+  // Iceberg Metastore location is constructed based on
+  // combination of base_path and relative_path config values
+  location: {
+    // Iceberg table base path, if not indicated, user home directory will be 
used
+    // base_path: "/app",
 
-      // Iceberg table relative path unique among clusters
-      // ${drill.exec.zk.root} value will be substituted from Drill main config
-       relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
-    }
+    // Iceberg table relative path unique among clusters
+    // ${drill.exec.zk.root} value will be substituted from Drill main config
+    relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
+  }
+
+  // Specifies time period after which outdated Iceberg table metadata will be 
expired,
+  // unit names must correspond to java.time.temporal.ChronoUnit enum values
+  // that do not have estimated duration (millis, seconds, minutes, hours, 
days).
+  // Example: hours: 10, minutes: 20
+  expiration.period: {
+    days: 5
+  }
 
-    components: {
-        // Common properties for all Iceberg tables from 
org.apache.iceberg.TableProperties can be specified
-       common.properties: {
-          write.format.default: "parquet",
-          write.metadata.compression-codec: "none"
-       },
+  components: {
+    // Common properties for all Iceberg tables from 
org.apache.iceberg.TableProperties can be specified
+    common.properties: {
+      write.format.default: "parquet",
+      write.metadata.compression-codec: "none"
+    },
 
-       tables: {
-          // Iceberg table location in Iceberg Metastore
-          location: "tables",
+    tables: {
+      // Iceberg table location in Iceberg Metastore
+      location: "tables",
 
-          // Specific tables properties from 
org.apache.iceberg.TableProperties can be specified
-          properties: {}
-       },
+      // Specific tables properties from org.apache.iceberg.TableProperties 
can be specified
+      properties: {}
+    },
 
-       views: {
-          // Iceberg table location in Iceberg Metastore
-          location: "views",
+    views: {
+      // Iceberg table location in Iceberg Metastore
+      location: "views",
 
-          // Specific views properties from org.apache.iceberg.TableProperties 
can be specified
-          properties: {}
-       }
+      // Specific views properties from org.apache.iceberg.TableProperties can 
be specified
+      properties: {}
     }
+  }
 }
diff --git 
a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
index c84a3b9..3e96ca2 100644
--- 
a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
+++ 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTableKey.java
@@ -48,7 +48,7 @@ public class TestTableKey extends IcebergBaseTest {
     TableKey tableKey = new TableKey("dfs", "tmp", "nation");
 
     String expected = new Path(
-      Paths.get("/metastore", "dfs", "tmp", "nation").toUri().getPath())
+      Paths.get("/metastore", "dfs", "tmp", "nation").toFile().getPath())
       .toUri().getPath();
 
     assertEquals(expected, tableKey.toLocation("/metastore"));
diff --git 
a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
index d7b72eb..80c6f77 100644
--- 
a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
+++ 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
@@ -70,9 +70,10 @@ public class TestTablesOperationTransformer extends 
IcebergBaseTest {
 
     assertEquals(expression.toString(), operation.filter().toString());
 
-    File file = new File(new 
Path(String.valueOf(operation.dataFile().path())).toUri().getPath());
+    Path path = new Path(String.valueOf(operation.dataFile().path()));
+    File file = new File(path.toUri().getPath());
     assertTrue(file.exists());
-    assertEquals(location, file.getParent());
+    assertEquals(location, path.getParent().toUri().getPath());
   }
 
   @Test
diff --git 
a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
new file mode 100644
index 0000000..90ef55b
--- /dev/null
+++ 
b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.iceberg.operate;
+
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
+import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
+import org.apache.iceberg.Table;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestExpirationHandler extends IcebergBaseTest {
+
+  @Test
+  public void testConfigEmpty() {
+    ExpirationHandler expirationHandler = new 
ExpirationHandler(DrillConfig.create(), baseHadoopConfig());
+    assertEquals(0, expirationHandler.expirationPeriod());
+  }
+
+  @Test
+  public void testConfigOneUnit() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
+        ConfigValueFactory.fromAnyRef(5)));
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+    assertEquals(TimeUnit.HOURS.toMillis(5), 
expirationHandler.expirationPeriod());
+  }
+
+  @Test
+  public void testConfigSeveralUnits() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
+        ConfigValueFactory.fromAnyRef(5))
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".minutes",
+        ConfigValueFactory.fromAnyRef(10)));
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+    assertEquals(TimeUnit.HOURS.toMillis(5) + TimeUnit.MINUTES.toMillis(10),
+      expirationHandler.expirationPeriod());
+  }
+
+  @Test
+  public void testConfigNegativeValue() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
+        ConfigValueFactory.fromAnyRef(-5)));
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+    assertEquals(TimeUnit.HOURS.toMillis(-5), 
expirationHandler.expirationPeriod());
+  }
+
+  @Test
+  public void testConfigIncorrectUnit() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hour",
+        ConfigValueFactory.fromAnyRef(5)));
+
+    thrown.expect(IcebergMetastoreException.class);
+    new ExpirationHandler(config, baseHadoopConfig());
+  }
+
+  @Test
+  public void testConfigIncorrectValue() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours",
+        ConfigValueFactory.fromAnyRef("abc")));
+
+    thrown.expect(IcebergMetastoreException.class);
+    new ExpirationHandler(config, baseHadoopConfig());
+  }
+
+  @Test
+  public void testExpireZeroExpirationPeriod() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
+        ConfigValueFactory.fromAnyRef(0)));
+
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+    Table table = mock(Table.class);
+    assertFalse(expirationHandler.expire(table));
+  }
+
+  @Test
+  public void testExpireNegativeExpirationPeriod() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
+        ConfigValueFactory.fromAnyRef(-10)));
+
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+    Table table = mock(Table.class);
+    assertFalse(expirationHandler.expire(table));
+  }
+
+  @Test
+  public void testExpireFirstTime() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis",
+        ConfigValueFactory.fromAnyRef(1)));
+
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+
+    Table table = mock(Table.class);
+    when(table.location()).thenReturn("/tmp/table");
+
+    assertFalse(expirationHandler.expire(table));
+  }
+
+  @Test
+  public void testExpireBefore() {
+    DrillConfig config = new DrillConfig(DrillConfig.create()
+      .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".days",
+        ConfigValueFactory.fromAnyRef(1)));
+
+    ExpirationHandler expirationHandler = new ExpirationHandler(config, 
baseHadoopConfig());
+
+    Table table = mock(Table.class);
+    when(table.location()).thenReturn("/tmp/table");
+
+    assertFalse(expirationHandler.expire(table));
+    assertFalse(expirationHandler.expire(table));
+  }
+}
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/Metastore.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/Metastore.java
index 7dad82d..2bbe461 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/Metastore.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/Metastore.java
@@ -29,7 +29,7 @@ import org.apache.drill.metastore.components.views.Views;
  * Besides implementing {@link Metastore}, Metastore implementation must have 
constructor
  * which accepts {@link org.apache.drill.common.config.DrillConfig}.
  */
-public interface Metastore {
+public interface Metastore extends AutoCloseable {
 
   /**
    * @return Metastore Tables component implementation
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
index 0343463..293ab56 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
@@ -32,7 +32,7 @@ import java.lang.invoke.MethodType;
  * Metastore initialization is delayed until {@link #get()} method is called.
  * Metastore implementation must have constructor which accepts {@link 
DrillConfig}.
  */
-public class MetastoreRegistry {
+public class MetastoreRegistry implements AutoCloseable {
 
   private DrillConfig config;
   private volatile Metastore metastore;
@@ -104,4 +104,11 @@ public class MetastoreRegistry {
   private DrillConfig createMetastoreConfig(DrillConfig config) {
     return DrillConfig.create(null, null, true, new MetastoreConfigFileInfo(), 
config.root());
   }
+
+  @Override
+  public void close() throws Exception {
+    if (metastore != null) {
+      metastore.close();
+    }
+  }
 }
diff --git 
a/metastore/metastore-api/src/main/resources/drill-metastore-default.conf 
b/metastore/metastore-api/src/main/resources/drill-metastore-default.conf
index beb4f0a..3ab55b1 100644
--- a/metastore/metastore-api/src/main/resources/drill-metastore-default.conf
+++ b/metastore/metastore-api/src/main/resources/drill-metastore-default.conf
@@ -15,5 +15,5 @@
 // limitations under the License.
 
 drill.metastore: {
-    implementation.class: "org.apache.drill.metastore.iceberg.IcebergMetastore"
+  implementation.class: "org.apache.drill.metastore.iceberg.IcebergMetastore"
 }

Reply via email to