thomasmueller commented on code in PR #587:
URL: https://github.com/apache/jackrabbit-oak/pull/587#discussion_r891166140


##########
oak-commons/pom.xml:
##########
@@ -82,6 +82,11 @@
       <artifactId>org.osgi.annotation</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>

Review Comment:
   License + copyright: Apache License, Version 2.0, January 2004
   https://github.com/lz4/lz4-java/blob/master/LICENSE.txt



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -88,6 +92,10 @@ public static void sort(File input, File output) throws 
IOException {
     * Defines the default maximum memory to be used while sorting (8 MB)
     */
     static final long DEFAULT_MAX_MEM_BYTES = 8388608L;
+
+    public enum compressionType {

Review Comment:
   Should be CompressionType



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -387,20 +468,28 @@ public static File sortAndSave(List<String> tmplist,
      * @param tmpdirectory
 *            location of the temporary files (set to null for default location)
      * @param distinct
+     *            Pass <code>true</code> if duplicate lines should be 
discarded. ([email protected])
+     * @param useCompression
+     *            enable compression for temporary files
+     * @param type
+     *            use gzip or lz4 as compression algorithm
      * @param typeToString
      *        function to map string to custom type. User for coverting line 
to custom type for the
      *        purpose of sorting
      */
     public static <T> File sortAndSave(List<T> tmplist,
                                    Comparator<T> cmp, Charset cs, File 
tmpdirectory,
-                                   boolean distinct, boolean usegzip, 
Function<T, String> typeToString) throws IOException {
+                                   boolean distinct, boolean useCompression, 
compressionType type,
+                                   Function<T, String> typeToString) throws 
IOException {
         Collections.sort(tmplist, cmp);
         File newtmpfile = File.createTempFile("sortInBatch",
                 "flatfile", tmpdirectory);
         newtmpfile.deleteOnExit();
         OutputStream out = new FileOutputStream(newtmpfile);
         int zipBufferSize = 2048;
-        if (usegzip) {
+        if (useCompression && type == compressionType.LZ4) {

Review Comment:
   I would use:
   
       if (useCompression) {
           switch (type) {
                ..
           }
       }
   
   or
   
       if (useCompression) {
           if (type == compressionType.LZ4) {
                ..
           } else if (...) {
               int zipBufferSize = 2048;
                ..
           }    
       }
   



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -387,20 +468,28 @@ public static File sortAndSave(List<String> tmplist,
      * @param tmpdirectory
 *            location of the temporary files (set to null for default location)
      * @param distinct
+     *            Pass <code>true</code> if duplicate lines should be 
discarded. ([email protected])
+     * @param useCompression
+     *            enable compression for temporary files
+     * @param type
+     *            use gzip or lz4 as compression algorithm
      * @param typeToString
      *        function to map string to custom type. User for coverting line 
to custom type for the
      *        purpose of sorting
      */
     public static <T> File sortAndSave(List<T> tmplist,
                                    Comparator<T> cmp, Charset cs, File 
tmpdirectory,
-                                   boolean distinct, boolean usegzip, 
Function<T, String> typeToString) throws IOException {
+                                   boolean distinct, boolean useCompression, 
compressionType type,
+                                   Function<T, String> typeToString) throws 
IOException {
         Collections.sort(tmplist, cmp);
         File newtmpfile = File.createTempFile("sortInBatch",
                 "flatfile", tmpdirectory);
         newtmpfile.deleteOnExit();
         OutputStream out = new FileOutputStream(newtmpfile);
         int zipBufferSize = 2048;

Review Comment:
   This line should be moved to (the only place) where it is used.



##########
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/progress/IndexingProgressReporter.java:
##########
@@ -229,7 +229,9 @@ public IndexUpdateState(String indexPath, boolean reindex, 
long estimatedCount)
         }
 
         public void indexUpdate() throws CommitFailedException {
-            updateCount++;
+            synchronized (this) {
+                updateCount++;

Review Comment:
   Does doesn't seem right... I don't think synchronizing just the increment 
operation will ensure the count is correct... I wouldn't synchronize, as the 
count doesn't need to be correct. If it needs to be correct, then AtomicInteger 
would need to be used.



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -584,17 +736,49 @@ public static <T> int mergeSortedFiles(List<File> files,
                                            BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
                                            boolean usegzip, Function<T, 
String> typeToString,
                                            Function<String, T> stringToType) 
throws IOException {
+        return mergeSortedFiles(files, fbw, cmp, cs, distinct, usegzip, 
compressionType.GZIP, typeToString, stringToType);
+    }
+
+    /**
+     * This merges a bunch of temporary flat files and deletes them on success 
or error.
+     *
+     * @param files
+     *            The {@link List} of sorted {@link File}s to be merged.
+     * @param fbw
+     *            Buffered writer used to store the sorted content
+     * @param cmp
+     *            The {@link Comparator} to use to compare {@link String}s.
+     * @param cs
+     *            The {@link Charset} to be used for the byte to character 
conversion.
+     * @param distinct
+     *            Pass <code>true</code> if duplicate lines should be 
discarded. ([email protected])
+     * @param useCompression
+     *            enable compression for temporary files
+     * @param type
+     *            use gzip or lz4 as compression algorithm
+     * @param typeToString
+     *            function to map string to custom type. User for coverting 
line to custom type for the
+     *            purpose of sorting
+     * @param stringToType
+     *          function to map custom type to string. Used for storing sorted 
content back to file
+     * @since v0.1.4
+     */
+    public static <T> int mergeSortedFiles(List<File> files,
+                                           BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
+                                           boolean useCompression, 
compressionType type, Function<T, String> typeToString,
+                                           Function<String, T> stringToType) 
throws IOException {
         ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
         try {
             for (File f : files) {
                 final int bufferSize = 2048;
                 InputStream in = new FileInputStream(f);
                 BufferedReader br;
-                if (usegzip) {
-                    br = new BufferedReader(new InputStreamReader(new 
GZIPInputStream(in, bufferSize), cs));
-                } else {
-                    br = new BufferedReader(new InputStreamReader(in, cs));
+                if (useCompression && type == compressionType.LZ4) {

Review Comment:
   Same as above



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -584,17 +736,49 @@ public static <T> int mergeSortedFiles(List<File> files,
                                            BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
                                            boolean usegzip, Function<T, 
String> typeToString,
                                            Function<String, T> stringToType) 
throws IOException {
+        return mergeSortedFiles(files, fbw, cmp, cs, distinct, usegzip, 
compressionType.GZIP, typeToString, stringToType);
+    }
+
+    /**
+     * This merges a bunch of temporary flat files and deletes them on success 
or error.
+     *
+     * @param files
+     *            The {@link List} of sorted {@link File}s to be merged.
+     * @param fbw
+     *            Buffered writer used to store the sorted content
+     * @param cmp
+     *            The {@link Comparator} to use to compare {@link String}s.
+     * @param cs
+     *            The {@link Charset} to be used for the byte to character 
conversion.
+     * @param distinct
+     *            Pass <code>true</code> if duplicate lines should be 
discarded. ([email protected])
+     * @param useCompression
+     *            enable compression for temporary files
+     * @param type
+     *            use gzip or lz4 as compression algorithm
+     * @param typeToString
+     *            function to map string to custom type. User for coverting 
line to custom type for the
+     *            purpose of sorting
+     * @param stringToType
+     *          function to map custom type to string. Used for storing sorted 
content back to file
+     * @since v0.1.4
+     */
+    public static <T> int mergeSortedFiles(List<File> files,
+                                           BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
+                                           boolean useCompression, 
compressionType type, Function<T, String> typeToString,
+                                           Function<String, T> stringToType) 
throws IOException {
         ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
         try {
             for (File f : files) {
                 final int bufferSize = 2048;
                 InputStream in = new FileInputStream(f);
                 BufferedReader br;
-                if (usegzip) {
-                    br = new BufferedReader(new InputStreamReader(new 
GZIPInputStream(in, bufferSize), cs));
-                } else {
-                    br = new BufferedReader(new InputStreamReader(in, cs));
+                if (useCompression && type == compressionType.LZ4) {
+                    in = new LZ4FrameInputStream(in);
+                } else if (useCompression && type == compressionType.GZIP) {
+                    in = new GZIPInputStream(in, bufferSize);

Review Comment:
   Now, "in" is the possibly compressed stream... it used to be the raw stream. 
But I think it doesn't make a difference... could you verify?



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -88,6 +92,10 @@ public static void sort(File input, File output) throws 
IOException {
     * Defines the default maximum memory to be used while sorting (8 MB)
     */
     static final long DEFAULT_MAX_MEM_BYTES = 8388608L;
+
+    public enum compressionType {

Review Comment:
   I wonder if we should add "CompressionType.NONE" to simplify things a bit...



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java:
##########
@@ -584,17 +736,49 @@ public static <T> int mergeSortedFiles(List<File> files,
                                            BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
                                            boolean usegzip, Function<T, 
String> typeToString,
                                            Function<String, T> stringToType) 
throws IOException {
+        return mergeSortedFiles(files, fbw, cmp, cs, distinct, usegzip, 
compressionType.GZIP, typeToString, stringToType);
+    }
+
+    /**
+     * This merges a bunch of temporary flat files and deletes them on success 
or error.
+     *
+     * @param files
+     *            The {@link List} of sorted {@link File}s to be merged.
+     * @param fbw
+     *            Buffered writer used to store the sorted content
+     * @param cmp
+     *            The {@link Comparator} to use to compare {@link String}s.
+     * @param cs
+     *            The {@link Charset} to be used for the byte to character 
conversion.
+     * @param distinct
+     *            Pass <code>true</code> if duplicate lines should be 
discarded. ([email protected])
+     * @param useCompression
+     *            enable compression for temporary files
+     * @param type
+     *            use gzip or lz4 as compression algorithm
+     * @param typeToString
+     *            function to map string to custom type. User for coverting 
line to custom type for the
+     *            purpose of sorting
+     * @param stringToType
+     *          function to map custom type to string. Used for storing sorted 
content back to file
+     * @since v0.1.4
+     */
+    public static <T> int mergeSortedFiles(List<File> files,
+                                           BufferedWriter fbw, final 
Comparator<T> cmp, Charset cs, boolean distinct,
+                                           boolean useCompression, 
compressionType type, Function<T, String> typeToString,
+                                           Function<String, T> stringToType) 
throws IOException {
         ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
         try {
             for (File f : files) {
                 final int bufferSize = 2048;

Review Comment:
   Same as above



##########
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/progress/IndexingProgressReporter.java:
##########
@@ -229,7 +229,9 @@ public IndexUpdateState(String indexPath, boolean reindex, 
long estimatedCount)
         }
 
         public void indexUpdate() throws CommitFailedException {
-            updateCount++;
+            synchronized (this) {
+                updateCount++;
+            }
             if (updateCount % 10000 == 0) {
                 log.info("{} => Indexed {} nodes in {} ...", indexPath, 
updateCount, watch);
                 watch.reset().start();

Review Comment:
   watch.reset() might not be thread-save, so this _should_ be synchronized:
   
       updateCount++;
       if (updateCount % 10000 == 0) {
           synchronized (this) {
               log.info("{} => Indexed {} nodes in {} ...", indexPath, 
updateCount, watch);
               watch.reset().start();
               ...
           }
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to