Repository: carbondata Updated Branches: refs/heads/master e622fa998 -> 74a2ddee9
[CARBONDATA-3056] Added concurrent reading through SDK Added an API CarbonReader.split to enable concurrent reading of carbondata files through SDK. This closes #2850 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74a2ddee Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74a2ddee Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74a2ddee Branch: refs/heads/master Commit: 74a2ddee93c8f44159a6035d3cf3b9906a78b03f Parents: e622fa9 Author: Naman Rastogi <naman.rastogi...@gmail.com> Authored: Thu Oct 18 18:24:23 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Fri Nov 2 17:41:41 2018 +0530 ---------------------------------------------------------------------- docs/sdk-guide.md | 20 +++ .../carbondata/sdk/file/CarbonReader.java | 52 ++++++ .../sdk/file/ConcurrentSdkReaderTest.java | 159 +++++++++++++++++++ 3 files changed, 231 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/74a2ddee/docs/sdk-guide.md ---------------------------------------------------------------------- diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index 0ee1524..cb34627 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -591,6 +591,26 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/ ``` ``` +/** + * Breaks the list of CarbonRecordReader in CarbonReader into multiple + * CarbonReader objects, each iterating through some 'carbondata' files + * and return that list of CarbonReader objects + * + * If the no. of files is greater than maxSplits, then break the + * CarbonReader into maxSplits splits, with each split iterating + * through >= 1 file. + * + * If the no. of files is less than maxSplits, then return list of + * CarbonReader with size as the no. of files, with each CarbonReader + * iterating through exactly one file + * + * @param maxSplits: Int + * @return list of CarbonReader objects + */ + public List<CarbonReader> split(int maxSplits); +`` + +``` /** * Return true if has next row */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/74a2ddee/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java index a381429..1a55a2e 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java @@ -18,6 +18,7 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -115,6 +116,57 @@ public class CarbonReader<T> { } /** + * Breaks the list of CarbonRecordReader in CarbonReader into multiple + * CarbonReader objects, each iterating through some 'carbondata' files + * and return that list of CarbonReader objects + * + * If the no. of files is greater than maxSplits, then break the + * CarbonReader into maxSplits splits, with each split iterating + * through >= 1 file. + * + * If the no. of files is less than maxSplits, then return list of + * CarbonReader with size as the no. of files, with each CarbonReader + * iterating through exactly one file + * + * @param maxSplits: Int + * @return list of {@link CarbonReader} objects + */ + public List<CarbonReader> split(int maxSplits) throws IOException { + validateReader(); + if (maxSplits < 1) { + throw new RuntimeException( + this.getClass().getSimpleName() + ".split: maxSplits must be positive"); + } + + List<CarbonReader> carbonReaders = new ArrayList<>(); + + if (maxSplits < this.readers.size()) { + // If maxSplits is less than the no. of files + // Split the reader into maxSplits splits with each + // element containing >= 1 CarbonRecordReader objects + float filesPerSplit = (float) this.readers.size() / maxSplits; + for (int i = 0; i < maxSplits; ++i) { + carbonReaders.add(new CarbonReader<>(this.readers.subList( + (int) Math.ceil(i * filesPerSplit), + (int) Math.ceil(((i + 1) * filesPerSplit))))); + } + } else { + // If maxSplits is greater than the no. of files + // Split the reader into <num_files> splits with each + // element contains exactly 1 CarbonRecordReader object + for (int i = 0; i < this.readers.size(); ++i) { + carbonReaders.add(new CarbonReader<>(this.readers.subList(i, i + 1))); + } + } + + // This is to disable the use of this CarbonReader object to iterate + // over the files and forces user to only use the returned splits + this.initialise = false; + + return carbonReaders; + } + + /** * Close reader * * @throws IOException http://git-wip-us.apache.org/repos/asf/carbondata/blob/74a2ddee/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java new file mode 100644 index 0000000..fef3319 --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +import org.apache.commons.io.FileUtils; +import org.junit.*; + +/** + * multi-thread Test suite for {@link CarbonReader} + */ +public class ConcurrentSdkReaderTest { + + private static final String dataDir = "./testReadFiles"; + + @Before @After public void cleanTestData() { + try { + FileUtils.deleteDirectory(new File(dataDir)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + private void writeDataMultipleFiles(int numFiles, long numRowsPerFile) { + Field[] fields = new Field[2]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + + for (int numFile = 0; numFile < numFiles; ++numFile) { + CarbonWriterBuilder builder = + CarbonWriter.builder().outputPath(dataDir).withCsvInput(new Schema(fields)) + .writtenBy("ConcurrentSdkReaderTest"); + + try { + CarbonWriter writer = builder.build(); + + for (long i = 0; i < numRowsPerFile; ++i) { + writer.write(new String[] { "robot_" + i, String.valueOf(i) }); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + + @Test public void testReadParallely() throws IOException, InterruptedException { + int numFiles = 10; + int numRowsPerFile = 10; + short numThreads = 4; + writeDataMultipleFiles(numFiles, numRowsPerFile); + long count; + + // Sequential Reading + CarbonReader reader = CarbonReader.builder(dataDir).build(); + try { + count = 0; + long start = System.currentTimeMillis(); + while (reader.hasNext()) { + reader.readNextRow(); + count += 1; + } + long end = System.currentTimeMillis(); + System.out.println("[Sequential read] Time: " + (end - start) + " ms"); + Assert.assertEquals(numFiles * numRowsPerFile, count); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + reader.close(); + } + + // Concurrent Reading + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + try { + CarbonReader reader2 = CarbonReader.builder(dataDir).build(); + List<CarbonReader> multipleReaders = reader2.split(numThreads); + try { + List<ReadLogic> tasks = new ArrayList<>(); + List<Future<Long>> results; + count = 0; + + for (CarbonReader reader_i : multipleReaders) { + tasks.add(new ReadLogic(reader_i)); + } + long start = System.currentTimeMillis(); + results = executorService.invokeAll(tasks); + for (Future result_i : results) { + count += (long) result_i.get(); + } + long end = System.currentTimeMillis(); + System.out.println("[Parallel read] Time: " + (end - start) + " ms"); + Assert.assertEquals(numFiles * numRowsPerFile, count); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + } + } + + class ReadLogic implements Callable<Long> { + CarbonReader reader; + + ReadLogic(CarbonReader reader) { + this.reader = reader; + } + + @Override public Long call() throws IOException { + long count = 0; + try { + while (reader.hasNext()) { + reader.readNextRow(); + count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + reader.close(); + } + return count; + } + } + +} \ No newline at end of file