This is an automated email from the ASF dual-hosted git repository.

rdsr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d1fc91  Remove thread local objects and use new visitors to fix 
executor memory leaks (#1169)
4d1fc91 is described below

commit 4d1fc91d6528db88548c506a0b110b50121d67ff
Author: jun-he <[email protected]>
AuthorDate: Mon Jul 6 10:49:24 2020 -0700

    Remove thread local objects and use new visitors to fix executor memory 
leaks (#1169)
---
 .../org/apache/iceberg/expressions/Evaluator.java  | 10 +---------
 .../expressions/InclusiveMetricsEvaluator.java     | 10 +---------
 .../iceberg/expressions/ManifestEvaluator.java     | 10 +---------
 .../iceberg/expressions/ResidualEvaluator.java     | 10 +---------
 .../expressions/StrictMetricsEvaluator.java        | 10 +---------
 .../parquet/ParquetDictionaryRowGroupFilter.java   | 23 ++--------------------
 .../parquet/ParquetMetricsRowGroupFilter.java      | 10 +---------
 7 files changed, 8 insertions(+), 75 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java 
b/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
index 0d3e1ef..b80a13b 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
@@ -35,14 +35,6 @@ import org.apache.iceberg.types.Types.StructType;
  */
 public class Evaluator implements Serializable {
   private final Expression expr;
-  private transient ThreadLocal<EvalVisitor> visitors = null;
-
-  private EvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(EvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public Evaluator(StructType struct, Expression unbound) {
     this.expr = Binder.bind(struct, unbound, true);
@@ -53,7 +45,7 @@ public class Evaluator implements Serializable {
   }
 
   public boolean eval(StructLike data) {
-    return visitor().eval(data);
+    return new EvalVisitor().eval(data);
   }
 
   private class EvalVisitor extends BoundVisitor<Boolean> {
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 148ac4f..7bad98c 100644
--- 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -47,14 +47,6 @@ import static 
org.apache.iceberg.expressions.Expressions.rewriteNot;
  */
 public class InclusiveMetricsEvaluator {
   private final Expression expr;
-  private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
-  private MetricsEvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
     this(schema, unbound, true);
@@ -73,7 +65,7 @@ public class InclusiveMetricsEvaluator {
    */
   public boolean eval(ContentFile<?> file) {
     // TODO: detect the case where a column is missing from the file using 
file's max field id.
-    return visitor().eval(file);
+    return new MetricsEvalVisitor().eval(file);
   }
 
   private static final boolean ROWS_MIGHT_MATCH = true;
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java 
b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
index 4023644..5e8be15 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
@@ -50,14 +50,6 @@ import static 
org.apache.iceberg.expressions.Expressions.rewriteNot;
 public class ManifestEvaluator {
   private final StructType struct;
   private final Expression expr;
-  private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
-
-  private ManifestEvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public static ManifestEvaluator forRowFilter(Expression rowFilter, 
PartitionSpec spec, boolean caseSensitive) {
     return new ManifestEvaluator(spec, Projections.inclusive(spec, 
caseSensitive).project(rowFilter), caseSensitive);
@@ -80,7 +72,7 @@ public class ManifestEvaluator {
    * @return false if the file cannot contain rows that match the expression, 
true otherwise.
    */
   public boolean eval(ManifestFile manifest) {
-    return visitor().eval(manifest);
+    return new ManifestEvalVisitor().eval(manifest);
   }
 
   private static final boolean ROWS_MIGHT_MATCH = true;
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java 
b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 7dd51b5..3ea3d7b 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -92,14 +92,6 @@ public class ResidualEvaluator implements Serializable {
   private final PartitionSpec spec;
   private final Expression expr;
   private final boolean caseSensitive;
-  private transient ThreadLocal<ResidualVisitor> visitors = null;
-
-  private ResidualVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(ResidualVisitor::new);
-    }
-    return visitors.get();
-  }
 
   private ResidualEvaluator(PartitionSpec spec, Expression expr, boolean 
caseSensitive) {
     this.spec = spec;
@@ -114,7 +106,7 @@ public class ResidualEvaluator implements Serializable {
    * @return the residual of this evaluator's expression from the partition 
values
    */
   public Expression residualFor(StructLike partitionData) {
-    return visitor().eval(partitionData);
+    return new ResidualVisitor().eval(partitionData);
   }
 
   private class ResidualVisitor extends BoundExpressionVisitor<Expression> {
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java 
b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
index 638eb25..8fd0b60 100644
--- 
a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
@@ -49,14 +49,6 @@ public class StrictMetricsEvaluator {
   private final Schema schema;
   private final StructType struct;
   private final Expression expr;
-  private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
-  private MetricsEvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public StrictMetricsEvaluator(Schema schema, Expression unbound) {
     this.schema = schema;
@@ -72,7 +64,7 @@ public class StrictMetricsEvaluator {
    */
   public boolean eval(ContentFile<?> file) {
     // TODO: detect the case where a column is missing from the file using 
file's max field id.
-    return visitor().eval(file);
+    return new MetricsEvalVisitor().eval(file);
   }
 
   private static final boolean ROWS_MUST_MATCH = true;
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 0a3ade8..c139521 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -48,14 +48,6 @@ import org.apache.parquet.schema.PrimitiveType;
 
 public class ParquetDictionaryRowGroupFilter {
   private final Expression expr;
-  private transient ThreadLocal<EvalVisitor> visitors = null;
-
-  private EvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(EvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
     this(schema, unbound, true);
@@ -75,7 +67,7 @@ public class ParquetDictionaryRowGroupFilter {
    */
   public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup,
                             DictionaryPageReadStore dictionaries) {
-    return visitor().eval(fileSchema, rowGroup, dictionaries);
+    return new EvalVisitor().eval(fileSchema, rowGroup, dictionaries);
   }
 
   private static final boolean ROWS_MIGHT_MATCH = true;
@@ -116,18 +108,7 @@ public class ParquetDictionaryRowGroupFilter {
         }
       }
 
-      try {
-        return ExpressionVisitors.visitEvaluator(expr, this);
-
-      } finally {
-        // allow temporary state to be collected because this is in a 
thread-local
-        this.dictionaries = null;
-        this.dictCache = null;
-        this.isFallback = null;
-        this.mayContainNulls = null;
-        this.cols = null;
-        this.conversions = null;
-      }
+      return ExpressionVisitors.visitEvaluator(expr, this);
     }
 
     @Override
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 3fdf905..56292f2 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -49,14 +49,6 @@ import org.apache.parquet.schema.PrimitiveType;
 public class ParquetMetricsRowGroupFilter {
   private final Schema schema;
   private final Expression expr;
-  private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
-  private MetricsEvalVisitor visitor() {
-    if (visitors == null) {
-      this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
-    }
-    return visitors.get();
-  }
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
     this(schema, unbound, true);
@@ -76,7 +68,7 @@ public class ParquetMetricsRowGroupFilter {
    * @return false if the file cannot contain rows that match the expression, 
true otherwise.
    */
   public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup) {
-    return visitor().eval(fileSchema, rowGroup);
+    return new MetricsEvalVisitor().eval(fileSchema, rowGroup);
   }
 
   private static final boolean ROWS_MIGHT_MATCH = true;

Reply via email to