Revert "KYLIN-980 spill to disk when sys available memory is low"
This reverts commit 0f8fc239162bbca913b1eceb380d89f674928400. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cfe9f0a8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cfe9f0a8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cfe9f0a8 Branch: refs/heads/1.x-HBase1.1.3 Commit: cfe9f0a8ac960b59281aefa0245e79ed4b534294 Parents: 6f93a4d Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Nov 30 14:08:27 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Nov 30 14:08:27 2015 +0800 ---------------------------------------------------------------------- .../common/util/MemoryBudgetController.java | 249 ------------------- .../hadoop/cube/FactDistinctColumnsReducer.java | 33 +-- 2 files changed, 8 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cfe9f0a8/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java deleted file mode 100644 index 4715ef6..0000000 --- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.util; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; - -public class MemoryBudgetController { - - private static final boolean debug = true; - - public interface MemoryConsumer { - // return number MB released - int freeUp(int mb); - } - - @SuppressWarnings("serial") - public static class NotEnoughBudgetException extends IllegalStateException { - - public NotEnoughBudgetException() { - super(); - } - - public NotEnoughBudgetException(Throwable cause) { - super(cause); - } - } - - private static class ConsumerEntry { - final MemoryConsumer consumer; - int reservedMB; - - ConsumerEntry(MemoryConsumer consumer) { - this.consumer = consumer; - } - } - - public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0); - public static final int ONE_MB = 1024 * 1024; - public static final long ONE_GB = 1024 * 1024 * 1024; - - private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class); - - // all budget numbers are in MB - private final int totalBudgetMB; - private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>(); - private int totalReservedMB; - private final ReentrantLock lock = new ReentrantLock(); - - public MemoryBudgetController(int totalBudgetMB) { - Preconditions.checkArgument(totalBudgetMB >= 0); - Preconditions.checkState(totalBudgetMB <= getSystemAvailMB()); - this.totalBudgetMB = totalBudgetMB; - this.totalReservedMB = 0; - } - - public int getTotalBudgetMB() { - return totalBudgetMB; - } - - public int getTotalReservedMB() { - lock.lock(); - try { - return totalReservedMB; - } finally { - lock.unlock(); - } - } - - public int getRemainingBudgetMB() { - lock.lock(); - try { - return totalBudgetMB - totalReservedMB; - } finally { - lock.unlock(); - } - } - - public void reserveInsist(MemoryConsumer consumer, int requestMB) { - long waitStart = 0; - while (true) { - try { - reserve(consumer, requestMB); - if (debug && waitStart > 0) - logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request"); - return; - } catch (NotEnoughBudgetException ex) { - // retry - } - - if (waitStart == 0) - waitStart = System.currentTimeMillis(); - - synchronized (lock) { - try { - lock.wait(); - } catch (InterruptedException e) { - throw new NotEnoughBudgetException(e); - } - } - } - } - - /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */ - public void reserve(MemoryConsumer consumer, int requestMB) { - if (totalBudgetMB == 0 && requestMB > 0) - throw new NotEnoughBudgetException(); - - boolean ok = false; - while (!ok) { - int gap = calculateGap(consumer, requestMB); - if (gap > 0) { - // to void deadlock, don't hold lock when invoking consumer.freeUp() - tryFreeUp(gap); - } - ok = updateBooking(consumer, requestMB); - } - } - - private int calculateGap(MemoryConsumer consumer, int requestMB) { - lock.lock(); - try { - ConsumerEntry entry = booking.get(consumer); - int curMB = entry == null ? 0 : entry.reservedMB; - int delta = requestMB - curMB; - return delta - (totalBudgetMB - totalReservedMB); - } finally { - lock.unlock(); - } - } - - private void tryFreeUp(int gap) { - // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock - for (ConsumerEntry entry : booking.values()) { - int mb = entry.consumer.freeUp(gap); - if (mb > 0) { - lock.lock(); - try { - updateBookingWithDelta(entry.consumer, -mb); - } finally { - lock.unlock(); - } - gap -= mb; - if (gap <= 0) - break; - } - } - if (gap > 0) - throw new NotEnoughBudgetException(); - - if (debug) { - if (getSystemAvailMB() < getRemainingBudgetMB()) { - logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong."); - } - } - } - - private boolean updateBooking(MemoryConsumer consumer, int requestMB) { - lock.lock(); - try { - ConsumerEntry entry = booking.get(consumer); - if (entry == null) { - if (requestMB == 0) - return true; - - entry = new ConsumerEntry(consumer); - booking.put(consumer, entry); - } - - int delta = requestMB - entry.reservedMB; - return updateBookingWithDelta(consumer, delta); - } finally { - lock.unlock(); - } - } - - // lock MUST be obtained before entering - private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) { - if (delta == 0) - return true; - - ConsumerEntry entry = booking.get(consumer); - if (entry == null) { - if (delta <= 0) - return true; - - entry = new ConsumerEntry(consumer); - booking.put(consumer, entry); - } - - // double check gap again, it may be changed by other concurrent requests - if (delta > 0) { - int gap = delta - (totalBudgetMB - totalReservedMB); - if (gap > 0) - return false; - } - - totalReservedMB += delta; - entry.reservedMB += delta; - if (entry.reservedMB == 0) { - booking.remove(entry.consumer); - } - if (debug) { - logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB"); - } - - if (delta < 0) { - synchronized (lock) { - lock.notifyAll(); - } - } - - return true; - } - - public static long getSystemAvailBytes() { - Runtime runtime = Runtime.getRuntime(); - long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process - long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free - long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting - long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using - long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used - return availableMemory; - } - - public static int getSystemAvailMB() { - return (int) (getSystemAvailBytes() / ONE_MB); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/cfe9f0a8/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java index f18e840..89f90ba 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java @@ -29,7 +29,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.mr.KylinReducer; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; @@ -39,7 +38,9 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; /** * @author yangli9 @@ -68,34 +69,15 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text TblColRef col = columnList.get(key.get()); HashSet<ByteArray> set = new HashSet<ByteArray>(); - int count = 0; for (Text textValue : values) { ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); set.add(value); - count++; - if (count % 10000 == 0 && MemoryBudgetController.getSystemAvailMB() < 100) { - outputDistinctValues(col, set, context); - set.clear(); - } - } - - if (set.isEmpty() == false) { - outputDistinctValues(col, set, context); } - } - - private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException { - final Configuration conf = context.getConfiguration(); - final FileSystem fs = FileSystem.get(conf); - final String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - final Path outputFile = new Path(outputPath, col.getName()); - FSDataOutputStream out; - if (fs.exists(outputFile)) { - out = fs.append(outputFile); - } else { - out = fs.create(outputFile); - } + Configuration conf = context.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + String outputPath = conf.get(BatchConstants.OUTPUT_PATH); + FSDataOutputStream out = fs.create(new Path(outputPath, col.getName())); try { for (ByteArray value : set) { @@ -105,6 +87,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text } finally { out.close(); } + } }