KYLIN-980 spill to disk when sys available memory is low

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0f8fc239
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0f8fc239
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0f8fc239

Branch: refs/heads/1.x-HBase1.1.3
Commit: 0f8fc239162bbca913b1eceb380d89f674928400
Parents: 4c44080
Author: shaofengshi <shaofeng...@apache.org>
Authored: Wed Nov 25 14:38:14 2015 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed Nov 25 14:40:07 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 249 +++++++++++++++++++
 .../hadoop/cube/FactDistinctColumnsReducer.java |  33 ++-
 2 files changed, 274 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0f8fc239/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java 
b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..4715ef6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,249 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException 
{
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new 
MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+    public static final long ONE_GB = 1024 * 1024 * 1024;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = 
new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        Preconditions.checkArgument(totalBudgetMB >= 0);
+        Preconditions.checkState(totalBudgetMB <= getSystemAvailMB());
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + 
(System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB 
request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately 
if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking 
consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method 
holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + 
" MB free, but system only has " + getSystemAvailMB() + " MB free. If this 
persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) 
{
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent 
requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " 
MB, total reserved " + totalReservedMB + " MB, remaining budget " + 
getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to 
the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, 
how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx 
setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current 
heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory 
i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0f8fc239/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 89f90ba..f18e840 100644
--- 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinReducer;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -38,9 +39,7 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author yangli9
@@ -69,15 +68,34 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<ShortWritable, Text
         TblColRef col = columnList.get(key.get());
 
         HashSet<ByteArray> set = new HashSet<ByteArray>();
+        int count = 0;
         for (Text textValue : values) {
             ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 
0, textValue.getLength()));
             set.add(value);
+            count++;
+            if (count % 10000 == 0 && 
MemoryBudgetController.getSystemAvailMB() < 100) {
+                outputDistinctValues(col, set, context);
+                set.clear();
+            }
         }
 
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, 
col.getName()));
+        if (set.isEmpty() == false) {
+            outputDistinctValues(col, set, context);
+        }
+
+    }
+
+    private void outputDistinctValues(TblColRef col, Set<ByteArray> set, 
Context context) throws IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = FileSystem.get(conf);
+        final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        final Path outputFile = new Path(outputPath, col.getName());
+        FSDataOutputStream out;
+        if (fs.exists(outputFile)) {
+            out = fs.append(outputFile);
+        } else {
+            out = fs.create(outputFile);
+        }
 
         try {
             for (ByteArray value : set) {
@@ -87,7 +105,6 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<ShortWritable, Text
         } finally {
             out.close();
         }
-
     }
 
 }

Reply via email to