[ 
https://issues.apache.org/jira/browse/PARQUET-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691631#comment-16691631
 ] 

ASF GitHub Bot commented on PARQUET-1456:
-----------------------------------------

zivanfi closed pull request #548: PARQUET-1456: Use page index, 
ParquetFileReader throw ArrayIndexOutOfBoundsException
URL: https://github.com/apache/parquet-mr/pull/548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
 
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index b28fddee4..e28a38041 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -22,15 +22,12 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.EnumMap;
 import java.util.Formatter;
 import java.util.List;
-import java.util.Map;
 import java.util.PrimitiveIterator;
 import java.util.function.IntPredicate;
 
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
@@ -42,11 +39,11 @@
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveStringifier;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
 import it.unimi.dsi.fastutil.booleans.BooleanList;
@@ -394,8 +391,6 @@ int sizeOf(Object value) {
     }
   };
 
-  private static final Map<PrimitiveTypeName, ColumnIndexBuilder> BUILDERS = 
new EnumMap<>(PrimitiveTypeName.class);
-
   private PrimitiveType type;
   private final BooleanList nullPages = new BooleanArrayList();
   private final LongList nullCounts = new LongArrayList();
@@ -469,12 +464,7 @@ public static ColumnIndex build(
       List<ByteBuffer> minValues,
       List<ByteBuffer> maxValues) {
 
-    PrimitiveTypeName typeName = type.getPrimitiveTypeName();
-    ColumnIndexBuilder builder = BUILDERS.get(typeName);
-    if (builder == null) {
-      builder = createNewBuilder(type, Integer.MAX_VALUE);
-      BUILDERS.put(typeName, builder);
-    }
+    ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE);
 
     builder.fill(nullPages, nullCounts, minValues, maxValues);
     ColumnIndexBase<?> columnIndex = builder.build(type);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java
new file mode 100644
index 000000000..4a6bd3b07
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMultipleWriteRead.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.io.api.Binary.fromString;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+/**
+ * Tests writing/reading multiple files in the same time (using multiple 
threads). Readers/writers do not support
+ * concurrency but the API shall support using separate reader/writer 
instances to read/write parquet files in different
+ * threads. (Of course, simultaneous writing to the same file is not 
supported.)
+ */
+public class TestMultipleWriteRead {
+  private static final MessageType SCHEMA = Types.buildMessage()
+      .required(INT32).named("id")
+      .required(BINARY).as(stringType()).named("name")
+      .requiredList().requiredElement(INT64).as(intType(64, 
false)).named("phone_numbers")
+      .optional(BINARY).as(stringType()).named("comment")
+      .named("msg");
+  private static final Comparator<Binary> BINARY_COMPARATOR = 
Types.required(BINARY).as(stringType()).named("dummy")
+      .comparator();
+
+  private static class DataGenerator implements Supplier<Group> {
+    private static final String ALPHABET = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz -";
+    private static final int NAME_MIN_SIZE = 5;
+    private static final int NAME_MAX_SIZE = 30;
+    private static final int PHONE_NUMBERS_MAX_SIZE = 5;
+    private static final long MIN_PHONE_NUMBER = 36_1_000_000;
+    private static final long MAX_PHONE_NUMBER = 36_1_999_999;
+    private static final double COMMENT_NULL_RATIO = 0.3;
+    private static final int COMMENT_MAX_SIZE = 200;
+
+    private final Random random;
+    private final GroupFactory factory = new SimpleGroupFactory(SCHEMA);
+
+    DataGenerator(long seed) {
+      random = new Random(seed);
+    }
+
+    private String getString(int minSize, int maxSize) {
+      int size = random.nextInt(maxSize - minSize) + minSize;
+      StringBuilder builder = new StringBuilder(size);
+      for (int i = 0; i < size; ++i) {
+        builder.append(ALPHABET.charAt(random.nextInt(ALPHABET.length())));
+      }
+      return builder.toString();
+    }
+
+    @Override
+    public Group get() {
+      Group group = factory.newGroup();
+      group.add("id", random.nextInt());
+      group.add("name", getString(NAME_MIN_SIZE, NAME_MAX_SIZE));
+      Group phoneNumbers = group.addGroup("phone_numbers");
+      for (int i = 0, n = random.nextInt(PHONE_NUMBERS_MAX_SIZE); i < n; ++i) {
+        Group phoneNumber = phoneNumbers.addGroup(0);
+        phoneNumber.add(0, random.nextLong() % (MAX_PHONE_NUMBER - 
MIN_PHONE_NUMBER) + MIN_PHONE_NUMBER);
+      }
+      if (random.nextDouble() >= COMMENT_NULL_RATIO) {
+        group.add("comment", getString(0, COMMENT_MAX_SIZE));
+      }
+      return group;
+    }
+  }
+
+  private static Path tmpDir;
+
+  @BeforeClass
+  public static void createTmpDir() {
+    tmpDir = new Path(Files.createTempDir().getAbsolutePath().toString());
+  }
+
+  @AfterClass
+  public static void deleteTmpDir() throws IOException {
+    tmpDir.getFileSystem(new Configuration()).delete(tmpDir, true);
+  }
+
+  private Path writeFile(Iterable<Group> data) throws IOException {
+    Path file = new Path(tmpDir, "testMultipleReadWrite_" + UUID.randomUUID() 
+ ".parquet");
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, SCHEMA.toString())
+        .build()) {
+      for (Group group : data) {
+        writer.write(group);
+      }
+    }
+    return file;
+  }
+
+  private void validateFile(Path file, List<Group> data) throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), file).build()) {
+      for (Group group : data) {
+        assertEquals(group.toString(), reader.read().toString());
+      }
+    }
+  }
+
+  private void validateFile(Path file, Filter filter, Stream<Group> data) 
throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), file)
+        .withFilter(filter)
+        .build()) {
+      for (Iterator<Group> it = data.iterator(); it.hasNext();) {
+        assertEquals(it.next().toString(), reader.read().toString());
+      }
+    }
+  }
+
+  private void validateFileWithIdFilter(Path file, List<Group> data) throws 
IOException {
+    validateFile(file, FilterCompat.get(eq(intColumn("id"), 0)),
+        data.stream().filter(group -> group.getInteger("id", 0) == 0));
+  }
+
+  private void validateFileWithCommentFilter(Path file, List<Group> data) 
throws IOException {
+    validateFile(file, FilterCompat.get(eq(binaryColumn("comment"), null)),
+        data.stream().filter(group -> group.getFieldRepetitionCount("comment") 
== 0));
+  }
+
+  private void validateFileWithComplexFilter(Path file, List<Group> data) 
throws IOException {
+    Binary binaryValueB = fromString("b");
+    Filter filter = FilterCompat.get(
+        and(
+            gtEq(intColumn("id"), 0),
+            and(
+                lt(binaryColumn("name"), binaryValueB),
+                notEq(binaryColumn("comment"), null))));
+    Predicate<Group> predicate = group -> group.getInteger("id", 0) >= 0
+        && BINARY_COMPARATOR.compare(group.getBinary("name", 0), binaryValueB) 
< 0
+        && group.getFieldRepetitionCount("comment") > 0;
+    validateFile(file, filter, data.stream().filter(predicate));
+  }
+
+  @Test
+  public void testWriteRead() throws Throwable {
+    // 10 random datasets with row counts 10000 to 1000
+    List<List<Group>> data = new ArrayList<>();
+    for (int i = 0; i < 10; ++i) {
+      data.add(Stream.generate(new DataGenerator(i)).limit(10000 - i * 
1000).collect(Collectors.toList()));
+    }
+
+    // Writes (and reads back the data to validate) the random values using 6 
threads
+    List<Future<Path>> futureFiles = new ArrayList<>();
+    ExecutorService exec = Executors.newFixedThreadPool(6);
+    for (List<Group> d : data) {
+      futureFiles.add(exec.submit(() -> {
+        Path file = writeFile(d);
+        validateFile(file, d);
+        return file;
+      }));
+    }
+    List<Path> files = new ArrayList<>();
+    for (Future<Path> future : futureFiles) {
+      try {
+        files.add(future.get());
+      } catch (ExecutionException e) {
+        throw e.getCause();
+      }
+    }
+
+    // Executes 3 filterings on each files using 6 threads
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < 10; ++i) {
+      Path file = files.get(i);
+      List<Group> d = data.get(i);
+      futures.add(exec.submit(() -> {
+        validateFileWithIdFilter(file, d);
+        return null;
+      }));
+      futures.add(exec.submit(() -> {
+        validateFileWithCommentFilter(file, d);
+        return null;
+      }));
+      futures.add(exec.submit(() -> {
+        validateFileWithComplexFilter(file, d);
+        return null;
+      }));
+    }
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        throw e.getCause();
+      }
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Use page index, ParquetFileReader throw ArrayIndexOutOfBoundsException
> ----------------------------------------------------------------------
>
>                 Key: PARQUET-1456
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1456
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>            Reporter: yiming.xu
>            Assignee: Gabor Szadovszky
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> hi, We use page index to adaptive spark with master branch find a concurrent 
> problem, class 
> org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder#build 
> cached ColumnIndexBuilder but not lock it,
>     PrimitiveTypeName typeName = type.getPrimitiveTypeName();
>     ColumnIndexBuilder builder = BUILDERS.get(typeName);
>     if (builder == null) {
>       builder = createNewBuilder(type, Integer.MAX_VALUE);
>       BUILDERS.put(typeName, builder);
>     }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to