This is an automated email from the ASF dual-hosted git repository. cstamas pushed a commit to branch MINDEXER-151-poc in repository https://gitbox.apache.org/repos/asf/maven-indexer.git
commit d6e8a38821635ad16a7534b4f4917218be3e2096 Author: Tamas Cservenak <[email protected]> AuthorDate: Thu Apr 28 12:38:49 2022 +0200 [MINDEXER-151] Proof of concept This change (ingesting GZIP files raw records into Lucene index on multiple threads) on my PC halves the execution time: while BasicUsageExample on master takes over 15 minutes to finish (when doing full update), this PR makes it under 7 minutes. --- .../maven/index/updater/IndexDataReader.java | 144 ++++++++++++++++----- indexer-examples/indexer-examples-basic/pom.xml | 2 +- .../maven/index/examples/BasicUsageExample.java | 8 +- 3 files changed, 122 insertions(+), 32 deletions(-) diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java index b1c4237..e4beae9 100644 --- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java +++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java @@ -26,9 +26,17 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.UTFDataFormatException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Date; -import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.lucene.document.Document; @@ -39,6 +47,8 @@ import org.apache.lucene.index.IndexWriter; import org.apache.maven.index.ArtifactInfo; import org.apache.maven.index.context.IndexUtils; import org.apache.maven.index.context.IndexingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An index data reader used to parse transfer index format. @@ -47,10 +57,12 @@ import org.apache.maven.index.context.IndexingContext; */ public class IndexDataReader { + private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class ); + private final DataInputStream dis; public IndexDataReader( final InputStream is ) - throws IOException + throws IOException { // MINDEXER-13 // LightweightHttpWagon may have performed automatic decompression @@ -72,8 +84,11 @@ public class IndexDataReader } public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context ) - throws IOException + throws IOException { + LOGGER.info( "Reading index..." ); + Instant start = Instant.now(); + long timestamp = readHeader(); Date date = null; @@ -87,45 +102,114 @@ public class IndexDataReader int n = 0; - Document doc; - Set<String> rootGroups = new LinkedHashSet<>(); - Set<String> allGroups = new LinkedHashSet<>(); + final Document END = new Document(); - while ( ( doc = readDocument() ) != null ) + ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>(); + ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>(); + ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 ); + int threads = Runtime.getRuntime().availableProcessors() / 2; + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + ArrayList<Exception> errors = new ArrayList<>(); + + for ( int i = 0; i < threads; i++ ) { - ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); - if ( ai != null ) - { - w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + executorService.execute( () -> { + LOGGER.info( "Starting thread {}", Thread.currentThread().getName() ); + try + { + while ( true ) + { + try + { + Document doc = queue.take(); + if ( doc == END ) + { + break; + } + addToIndex( doc, context, w, rootGroups, allGroups ); + } + catch ( InterruptedException | IOException e ) + { + errors.add( e ); + break; + } + } + } + finally + { + LOGGER.info( "Done thread {}", Thread.currentThread().getName() ); + } + } ); + } - rootGroups.add( ai.getRootGroup() ); - allGroups.add( ai.getGroupId() ); - } - else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null - || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null ) + try + { + Document doc; + while ( ( doc = readDocument() ) != null ) { - // skip it + queue.put( doc ); + n++; } - else + LOGGER.info( "Signalling END" ); + for ( int i = 0; i < threads; i++ ) { - w.addDocument( doc ); + queue.put( END ); } - n++; + + LOGGER.info( "Shutting down threads" ); + executorService.shutdown(); + executorService.awaitTermination( 5L, TimeUnit.MINUTES ); + } + catch ( InterruptedException e ) + { + throw new IOException( "Interrupted", e ); } + if ( !errors.isEmpty() ) + { + IOException exception = new IOException( "Error during load of index" ); + errors.forEach( exception::addSuppressed ); + throw exception; + } + + LOGGER.info( "Commit..." ); w.commit(); IndexDataReadResult result = new IndexDataReadResult(); result.setDocumentCount( n ); result.setTimestamp( date ); - result.setRootGroups( rootGroups ); - result.setAllGroups( allGroups ); + result.setRootGroups( rootGroups.keySet() ); + result.setAllGroups( allGroups.keySet() ); + LOGGER.info( "Reading index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() ); return result; } + private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter, + final ConcurrentMap<String, Boolean> rootGroups, + final ConcurrentMap<String, Boolean> allGroups ) + throws IOException + { + ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context ); + if ( ai != null ) + { + indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) ); + + rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE ); + allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE ); + } + else + { + if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null + && doc.getField( ArtifactInfo.ROOT_GROUPS ) != null ) + { + indexWriter.addDocument( doc ); + } + } + } + public long readHeader() - throws IOException + throws IOException { final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 ); @@ -139,7 +223,7 @@ public class IndexDataReader } public Document readDocument() - throws IOException + throws IOException { int fieldCount; try @@ -160,7 +244,7 @@ public class IndexDataReader // Fix up UINFO field wrt MINDEXER-41 final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO ); - final String info = doc.get( ArtifactInfo.INFO ); + final String info = doc.get( ArtifactInfo.INFO ); if ( uinfoField != null && info != null && !info.isEmpty() ) { final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info ); @@ -179,7 +263,7 @@ public class IndexDataReader } private Field readField() - throws IOException + throws IOException { int flags = dis.read(); @@ -199,7 +283,7 @@ public class IndexDataReader } private static String readUTF( DataInput in ) - throws IOException + throws IOException { int utflen = in.readInt(); @@ -214,7 +298,7 @@ public class IndexDataReader catch ( OutOfMemoryError e ) { throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!" - + " See MINDEXER-28 for more information!", e ); + + " See MINDEXER-28 for more information!", e ); } int c, char2, char3; @@ -282,7 +366,7 @@ public class IndexDataReader throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) ); } chararr[chararrCount++] = - (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); + (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) ); break; default: @@ -360,7 +444,7 @@ public class IndexDataReader * @throws IOException in case of an IO exception during index file access */ public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context ) - throws IOException + throws IOException { dis.readByte(); // data format version diff --git a/indexer-examples/indexer-examples-basic/pom.xml b/indexer-examples/indexer-examples-basic/pom.xml index 1881758..7b45ed1 100644 --- a/indexer-examples/indexer-examples-basic/pom.xml +++ b/indexer-examples/indexer-examples-basic/pom.xml @@ -58,7 +58,7 @@ under the License. <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> - <scope>test</scope> + <scope>runtime</scope> </dependency> <dependency> diff --git a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java index 7d79508..722947d 100644 --- a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java +++ b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java @@ -68,6 +68,8 @@ import org.eclipse.aether.version.Version; import java.io.File; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -142,14 +144,17 @@ public class BasicUsageExample // Preferred frequency is once a week. if ( true ) { + Instant updateStart = Instant.now(); System.out.println( "Updating Index..." ); System.out.println( "This might take a while on first run, so please be patient!" ); // Create ResourceFetcher implementation to be used with IndexUpdateRequest // Here, we use Wagon based one as shorthand, but all we need is a ResourceFetcher implementation TransferListener listener = new AbstractTransferListener() { + Instant start; public void transferStarted( TransferEvent transferEvent ) { + start = Instant.now(); System.out.print( " Downloading " + transferEvent.getResource().getName() ); } @@ -159,7 +164,7 @@ public class BasicUsageExample public void transferCompleted( TransferEvent transferEvent ) { - System.out.println( " - Done" ); + System.out.println( " - Done in " + Duration.between( start, Instant.now() ).getSeconds() + " sec" ); } }; ResourceFetcher resourceFetcher = new WagonHelper.WagonFetcher( httpWagon, listener, null, null ); @@ -182,6 +187,7 @@ public class BasicUsageExample + updateResult.getTimestamp() + " period." ); } + System.out.println( "Finished in " + Duration.between( updateStart, Instant.now() ).getSeconds() + " sec" ); System.out.println(); }
