Author: tommaso
Date: Mon Aug 1 11:21:08 2016
New Revision: 1754735
URL: http://svn.apache.org/viewvc?rev=1754735&view=rev
Log:
SLING-5924 - applied patch from Simone Tripodi for using direct/indirect
ByteBuffer for resource package builder
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
(with props)
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStream.java
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
Mon Aug 1 11:21:08 2016
@@ -39,6 +39,7 @@ import org.apache.sling.distribution.pac
import
org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.impl.vlt.VltUtils;
import org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream;
+import
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,13 +52,22 @@ public class ResourceDistributionPackage
private final File tempDirectory;
private final DistributionContentSerializer distributionContentSerializer;
private final int fileThreshold;
+ private final MemoryUnit memoryUnit;
+ private final boolean useOffHeapMemory;
- public ResourceDistributionPackageBuilder(String type,
DistributionContentSerializer distributionContentSerializer, String
tempFilesFolder, int fileThreshold) {
+ public ResourceDistributionPackageBuilder(String type,
+ DistributionContentSerializer
distributionContentSerializer,
+ String tempFilesFolder,
+ int fileThreshold,
+ MemoryUnit memoryUnit,
+ boolean useOffHeapMemory) {
super(type);
this.distributionContentSerializer = distributionContentSerializer;
this.packagesPath = PREFIX_PATH + type + "/data";
this.tempDirectory = VltUtils.getTempFolder(tempFilesFolder);
this.fileThreshold = fileThreshold;
+ this.memoryUnit = memoryUnit;
+ this.useOffHeapMemory = useOffHeapMemory;
}
@Override
@@ -67,7 +77,7 @@ public class ResourceDistributionPackage
FileBackedMemoryOutputStream outputStream = null;
try {
try {
- outputStream = new FileBackedMemoryOutputStream(fileThreshold,
tempDirectory, "distrpck-create-", "." + getType());
+ outputStream = new FileBackedMemoryOutputStream(fileThreshold,
memoryUnit, useOffHeapMemory, tempDirectory, "distrpck-create-", "." +
getType());
distributionContentSerializer.exportToStream(resourceResolver,
request, outputStream);
outputStream.flush();
} finally {
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
Mon Aug 1 11:21:08 2016
@@ -76,7 +76,7 @@ public class AsyncDeliveryDispatchingStr
for (String referenceQueueName : deliveryMappings.keySet()) {
DistributionQueue queue =
queueProvider.getQueue(referenceQueueName);
- if (queue.getStatus().getItemsCount() > 30) {
+ if (queue.getStatus().getItemsCount() > 100) {
// too many items in the queue, let's send actual packages and
references separately
distributionPackage.getInfo().put("reference-required", true);
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DistributionPackageBuilderFactory.java
Mon Aug 1 11:21:08 2016
@@ -18,11 +18,12 @@
*/
package org.apache.sling.distribution.serialization.impl;
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
import java.io.InputStream;
import java.util.Map;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -38,10 +39,11 @@ import org.apache.sling.distribution.com
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import
org.apache.sling.distribution.packaging.impl.FileDistributionPackageBuilder;
import
org.apache.sling.distribution.packaging.impl.ResourceDistributionPackageBuilder;
import
org.apache.sling.distribution.serialization.DistributionContentSerializer;
-import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
/**
* A factory for package builders
@@ -89,15 +91,39 @@ public class DistributionPackageBuilderF
private static final String TEMP_FS_FOLDER = "tempFsFolder";
// 1M
- private static final int DEFAULT_FILE_THRESHOLD_VALUE = 10240000;
+ private static final int DEFAULT_FILE_THRESHOLD_VALUE = 1;
@Property(
- label="File threshold (in bytes)",
+ label="File threshold",
description = "Once the data reaches the configurable size value,
buffering to memory switches to file buffering.",
intValue = DEFAULT_FILE_THRESHOLD_VALUE
)
public static final String FILE_THRESHOLD = "fileThreshold";
+ @Property(
+ label = "The memory unit for the file threshold",
+ description = "The memory unit for the file threshold, Megabytes by
default",
+ value = "MEGA_BYTES",
+ options = {
+ @PropertyOption(name = "BYTES", value = "Bytes"),
+ @PropertyOption(name = "KILO_BYTES", value = "Kilobytes"),
+ @PropertyOption(name = "MEGA_BYTES", value = "Megabytes"),
+ @PropertyOption(name = "GIGA_BYTES", value = "Gigabytes")
+ }
+ )
+ private static final String MEMORY_UNIT = "memoryUnit";
+
+ private static final String DEFAULT_MEMORY_UNIT = "MEGA_BYTES";
+
+ private static final boolean DEFAULT_USE_OFF_HEAP_MEMORY = false;
+
+ @Property(
+ label="Flag to enable/disable the off-heap memory",
+ description = "Flag to enable/disable the off-heap memory, false by
default",
+ boolValue = DEFAULT_USE_OFF_HEAP_MEMORY
+ )
+ public static final String USE_OFF_HEAP_MEMORY = "useOffHeapMemory";
+
private DistributionPackageBuilder packageBuilder;
@Activate
@@ -111,7 +137,10 @@ public class DistributionPackageBuilderF
packageBuilder = new
FileDistributionPackageBuilder(contentSerializer.getName(), contentSerializer,
tempFsFolder);
} else {
final int fileThreshold =
PropertiesUtil.toInteger(config.get(FILE_THRESHOLD),
DEFAULT_FILE_THRESHOLD_VALUE);
- packageBuilder = new
ResourceDistributionPackageBuilder(contentSerializer.getName(),
contentSerializer, tempFsFolder, fileThreshold);
+ String memoryUnitName =
PropertiesUtil.toString(config.get(MEMORY_UNIT), DEFAULT_MEMORY_UNIT);
+ final MemoryUnit memoryUnit = MemoryUnit.valueOf(memoryUnitName);
+ final boolean useOffHeapMemory =
PropertiesUtil.toBoolean(config.get(USE_OFF_HEAP_MEMORY),
DEFAULT_USE_OFF_HEAP_MEMORY);
+ packageBuilder = new
ResourceDistributionPackageBuilder(contentSerializer.getName(),
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory);
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/vlt/VaultDistributionPackageBuilderFactory.java
Mon Aug 1 11:21:08 2016
@@ -40,6 +40,7 @@ import org.apache.sling.distribution.com
import
org.apache.sling.distribution.component.impl.DistributionComponentConstants;
import org.apache.sling.distribution.component.impl.SettingsUtils;
import
org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -119,7 +120,7 @@ public class VaultDistributionPackageBui
public static final String AUTOSAVE_THRESHOLD = "autoSaveThreshold";
// 1M
- private static final int DEFAULT_FILE_THRESHOLD_VALUE = 10240000;
+ private static final int DEFAULT_FILE_THRESHOLD_VALUE = 1;
@Property(
label="File threshold (in bytes)",
@@ -128,6 +129,30 @@ public class VaultDistributionPackageBui
)
public static final String FILE_THRESHOLD = "fileThreshold";
+ @Property(
+ label = "The memory unit for the file threshold",
+ description = "The memory unit for the file threshold, Megabytes by
default",
+ value = "MEGA_BYTES",
+ options = {
+ @PropertyOption(name = "BYTES", value = "Bytes"),
+ @PropertyOption(name = "KILO_BYTES", value = "Kilobytes"),
+ @PropertyOption(name = "MEGA_BYTES", value = "Megabytes"),
+ @PropertyOption(name = "GIGA_BYTES", value = "Gigabytes")
+ }
+ )
+ private static final String MEMORY_UNIT = "MEGA_BYTES";
+
+ private static final String DEFAULT_MEMORY_UNIT = "MEGA_BYTES";
+
+ private static final boolean DEFAULT_USE_OFF_HEAP_MEMORY = false;
+
+ @Property(
+ label="Flag to enable/disable the off-heap memory",
+ description = "Flag to enable/disable the off-heap memory, false by
default",
+ boolValue = DEFAULT_USE_OFF_HEAP_MEMORY
+ )
+ public static final String USE_OFF_HEAP_MEMORY = "useOffHeapMemory";
+
@Reference
private Packaging packaging;
@@ -167,7 +192,10 @@ public class VaultDistributionPackageBui
packageBuilder = new FileDistributionPackageBuilder(name,
contentSerializer, tempFsFolder);
} else {
final int fileThreshold =
PropertiesUtil.toInteger(config.get(FILE_THRESHOLD),
DEFAULT_FILE_THRESHOLD_VALUE);
- packageBuilder = new ResourceDistributionPackageBuilder(name,
contentSerializer, tempFsFolder, fileThreshold);
+ String memoryUnitName =
PropertiesUtil.toString(config.get(MEMORY_UNIT), DEFAULT_MEMORY_UNIT);
+ final MemoryUnit memoryUnit = MemoryUnit.valueOf(memoryUnitName);
+ final boolean useOffHeapMemory =
PropertiesUtil.toBoolean(config.get(USE_OFF_HEAP_MEMORY),
DEFAULT_USE_OFF_HEAP_MEMORY);
+ packageBuilder = new
ResourceDistributionPackageBuilder(contentSerializer.getName(),
contentSerializer, tempFsFolder, fileThreshold, memoryUnit, useOffHeapMemory);
}
}
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java?rev=1754735&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
(added)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
Mon Aug 1 11:21:08 2016
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.util.impl;
+
+import static java.lang.Math.min;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+final class ByteBufferBackedInputStream extends InputStream {
+
+ private final ByteBuffer memory;
+
+ public ByteBufferBackedInputStream(ByteBuffer memory) {
+ this.memory = memory;
+ }
+
+ public int read() throws IOException {
+ if (!memory.hasRemaining()) {
+ return -1;
+ }
+ return memory.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len) throws IOException {
+ if (!memory.hasRemaining()) {
+ return -1;
+ }
+
+ len = min(len, memory.remaining());
+ memory.get(bytes, off, len);
+ return len;
+ }
+
+}
Propchange:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStream.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStream.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStream.java
Mon Aug 1 11:21:08 2016
@@ -18,16 +18,18 @@
*/
package org.apache.sling.distribution.util.impl;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import static java.lang.Math.pow;
+import static java.io.File.createTempFile;
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.ByteBuffer.allocateDirect;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-
-import static java.io.File.createTempFile;
+import java.nio.ByteBuffer;
/**
* {@link OutputStream} implementation which writes into a {@code byte[]}
until a certain {@link #fileThreshold} is
@@ -35,9 +37,22 @@ import static java.io.File.createTempFil
*/
public class FileBackedMemoryOutputStream extends OutputStream {
- private final int fileThreshold;
+ public enum MemoryUnit {
+
+ BYTES(1),
+ KILO_BYTES(1000),
+ MEGA_BYTES((int) pow(10, 6)),
+ GIGA_BYTES((int) pow(10, 9));
+
+ private final int memoryFactor;
- private final ByteArrayOutputStream memory;
+ private MemoryUnit(int memoryFactor) {
+ this.memoryFactor = memoryFactor;
+ }
+
+ };
+
+ private final ByteBuffer memory;
private final File tempDirectory;
@@ -49,9 +64,21 @@ public class FileBackedMemoryOutputStrea
private File file;
- public FileBackedMemoryOutputStream(int fileThreshold, File tempDirectory,
String fileName, String fileExtension) {
- this.fileThreshold = fileThreshold;
- this.memory = new ByteArrayOutputStream(fileThreshold);
+ public FileBackedMemoryOutputStream(int fileThreshold,
+ MemoryUnit memoryUnit,
+ boolean useOffHeapMemory,
+ File tempDirectory,
+ String fileName,
+ String fileExtension) {
+ if (fileThreshold < 0) {
+ throw new IllegalArgumentException("Negative fileThreshold size
has no semantic in this version.");
+ }
+ int threshold = fileThreshold * memoryUnit.memoryFactor;
+ if (useOffHeapMemory) {
+ memory = allocateDirect(threshold);
+ } else {
+ memory = allocate(threshold);
+ }
this.tempDirectory = tempDirectory;
this.fileName = fileName;
this.fileExtension = fileExtension;
@@ -59,25 +86,22 @@ public class FileBackedMemoryOutputStrea
@Override
public void write(int b) throws IOException {
- OutputStream current;
- if (memory.size() < fileThreshold) {
- current = memory;
+ if (memory.hasRemaining()) {
+ memory.put((byte) (b & 0xff));
} else {
if (out == null) {
file = createTempFile(fileName, fileExtension, tempDirectory);
out = new FileOutputStream(file);
- memory.writeTo(out);
+ memory.flip();
+ out.getChannel().write(memory);
}
- current = out;
+ out.write(b);
}
-
- current.write(b);
}
@Override
public void flush() throws IOException {
- memory.flush();
if (out != null) {
out.flush();
}
@@ -85,7 +109,6 @@ public class FileBackedMemoryOutputStrea
@Override
public void close() throws IOException {
- memory.close();
if (out != null) {
out.close();
}
@@ -100,10 +123,12 @@ public class FileBackedMemoryOutputStrea
if (file != null) {
return file.length();
}
- return memory.size();
+ return memory.position();
}
public void clean() {
+ memory.clear();
+ memory.rewind();
if (file != null) {
file.delete();
}
@@ -113,7 +138,8 @@ public class FileBackedMemoryOutputStrea
if (file != null) {
return new FileInputStream(file);
}
- return new ByteArrayInputStream(memory.toByteArray());
+ memory.flip();
+ return new ByteBufferBackedInputStream(memory);
}
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java?rev=1754735&r1=1754734&r2=1754735&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java
Mon Aug 1 11:21:08 2016
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
+import
org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
import org.junit.Test;
public class FileBackedMemoryOutputStreamTest {
@@ -36,6 +37,8 @@ public class FileBackedMemoryOutputStrea
@Test
public void justKeepDataInMemory() throws IOException {
FileBackedMemoryOutputStream output = new
FileBackedMemoryOutputStream(10,
+
MemoryUnit.BYTES,
+
false,
new File("/tmp"),
"FileBackedMemoryOutputStreamTest.justKeepDataInMemory",
".tmp");
@@ -51,6 +54,8 @@ public class FileBackedMemoryOutputStrea
@Test
public void backedToFile() throws IOException {
FileBackedMemoryOutputStream output = new
FileBackedMemoryOutputStream(2,
+
MemoryUnit.BYTES,
+
false,
new File("/tmp"),
"FileBackedMemoryOutputStreamTest.backedToFile",
".tmp");