Revert "KYLIN-980 spill to disk when sys available memory is low"

This reverts commit 0f8fc239162bbca913b1eceb380d89f674928400.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cfe9f0a8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cfe9f0a8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cfe9f0a8

Branch: refs/heads/1.x-HBase1.1.3
Commit: cfe9f0a8ac960b59281aefa0245e79ed4b534294
Parents: 6f93a4d
Author: shaofengshi <shaofeng...@apache.org>
Authored: Mon Nov 30 14:08:27 2015 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Nov 30 14:08:27 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 249 -------------------
 .../hadoop/cube/FactDistinctColumnsReducer.java |  33 +--
 2 files changed, 8 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cfe9f0a8/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java 
b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
deleted file mode 100644
index 4715ef6..0000000
--- 
a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.common.util;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class MemoryBudgetController {
-
-    private static final boolean debug = true;
-
-    public interface MemoryConsumer {
-        // return number MB released
-        int freeUp(int mb);
-    }
-
-    @SuppressWarnings("serial")
-    public static class NotEnoughBudgetException extends IllegalStateException 
{
-
-        public NotEnoughBudgetException() {
-            super();
-        }
-
-        public NotEnoughBudgetException(Throwable cause) {
-            super(cause);
-        }
-    }
-
-    private static class ConsumerEntry {
-        final MemoryConsumer consumer;
-        int reservedMB;
-
-        ConsumerEntry(MemoryConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
-    public static final MemoryBudgetController ZERO_BUDGET = new 
MemoryBudgetController(0);
-    public static final int ONE_MB = 1024 * 1024;
-    public static final long ONE_GB = 1024 * 1024 * 1024;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(MemoryBudgetController.class);
-
-    // all budget numbers are in MB
-    private final int totalBudgetMB;
-    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = 
new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-    private int totalReservedMB;
-    private final ReentrantLock lock = new ReentrantLock();
-
-    public MemoryBudgetController(int totalBudgetMB) {
-        Preconditions.checkArgument(totalBudgetMB >= 0);
-        Preconditions.checkState(totalBudgetMB <= getSystemAvailMB());
-        this.totalBudgetMB = totalBudgetMB;
-        this.totalReservedMB = 0;
-    }
-
-    public int getTotalBudgetMB() {
-        return totalBudgetMB;
-    }
-
-    public int getTotalReservedMB() {
-        lock.lock();
-        try {
-            return totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getRemainingBudgetMB() {
-        lock.lock();
-        try {
-            return totalBudgetMB - totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
-        long waitStart = 0;
-        while (true) {
-            try {
-                reserve(consumer, requestMB);
-                if (debug && waitStart > 0)
-                    logger.debug(consumer + " waited " + 
(System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB 
request");
-                return;
-            } catch (NotEnoughBudgetException ex) {
-                // retry
-            }
-
-            if (waitStart == 0)
-                waitStart = System.currentTimeMillis();
-
-            synchronized (lock) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    throw new NotEnoughBudgetException(e);
-                }
-            }
-        }
-    }
-
-    /** reserve without wait, fail with NotEnoughBudgetException immediately 
if no mem */
-    public void reserve(MemoryConsumer consumer, int requestMB) {
-        if (totalBudgetMB == 0 && requestMB > 0)
-            throw new NotEnoughBudgetException();
-
-        boolean ok = false;
-        while (!ok) {
-            int gap = calculateGap(consumer, requestMB);
-            if (gap > 0) {
-                // to void deadlock, don't hold lock when invoking 
consumer.freeUp()
-                tryFreeUp(gap);
-            }
-            ok = updateBooking(consumer, requestMB);
-        }
-    }
-
-    private int calculateGap(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            int curMB = entry == null ? 0 : entry.reservedMB;
-            int delta = requestMB - curMB;
-            return delta - (totalBudgetMB - totalReservedMB);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void tryFreeUp(int gap) {
-        // note don't hold lock when calling consumer.freeUp(), that method 
holding lock for itself and may cause deadlock
-        for (ConsumerEntry entry : booking.values()) {
-            int mb = entry.consumer.freeUp(gap);
-            if (mb > 0) {
-                lock.lock();
-                try {
-                    updateBookingWithDelta(entry.consumer, -mb);
-                } finally {
-                    lock.unlock();
-                }
-                gap -= mb;
-                if (gap <= 0)
-                    break;
-            }
-        }
-        if (gap > 0)
-            throw new NotEnoughBudgetException();
-
-        if (debug) {
-            if (getSystemAvailMB() < getRemainingBudgetMB()) {
-                logger.debug("Remaining budget is " + getRemainingBudgetMB() + 
" MB free, but system only has " + getSystemAvailMB() + " MB free. If this 
persists, some memory calculation must be wrong.");
-            }
-        }
-    }
-
-    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            if (entry == null) {
-                if (requestMB == 0)
-                    return true;
-
-                entry = new ConsumerEntry(consumer);
-                booking.put(consumer, entry);
-            }
-
-            int delta = requestMB - entry.reservedMB;
-            return updateBookingWithDelta(consumer, delta);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // lock MUST be obtained before entering
-    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) 
{
-        if (delta == 0)
-            return true;
-
-        ConsumerEntry entry = booking.get(consumer);
-        if (entry == null) {
-            if (delta <= 0)
-                return true;
-
-            entry = new ConsumerEntry(consumer);
-            booking.put(consumer, entry);
-        }
-
-        // double check gap again, it may be changed by other concurrent 
requests
-        if (delta > 0) {
-            int gap = delta - (totalBudgetMB - totalReservedMB);
-            if (gap > 0)
-                return false;
-        }
-
-        totalReservedMB += delta;
-        entry.reservedMB += delta;
-        if (entry.reservedMB == 0) {
-            booking.remove(entry.consumer);
-        }
-        if (debug) {
-            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " 
MB, total reserved " + totalReservedMB + " MB, remaining budget " + 
getRemainingBudgetMB() + " MB");
-        }
-
-        if (delta < 0) {
-            synchronized (lock) {
-                lock.notifyAll();
-            }
-        }
-
-        return true;
-    }
-
-    public static long getSystemAvailBytes() {
-        Runtime runtime = Runtime.getRuntime();
-        long totalMemory = runtime.totalMemory(); // current heap allocated to 
the VM process
-        long freeMemory = runtime.freeMemory(); // out of the current heap, 
how much is free
-        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx 
setting
-        long usedMemory = totalMemory - freeMemory; // how much of the current 
heap the VM is using
-        long availableMemory = maxMemory - usedMemory; // available memory 
i.e. Maximum heap size minus the current amount used
-        return availableMemory;
-    }
-
-    public static int getSystemAvailMB() {
-        return (int) (getSystemAvailBytes() / ONE_MB);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfe9f0a8/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index f18e840..89f90ba 100644
--- 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -29,7 +29,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinReducer;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,9 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 
 /**
  * @author yangli9
@@ -68,34 +69,15 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<ShortWritable, Text
         TblColRef col = columnList.get(key.get());
 
         HashSet<ByteArray> set = new HashSet<ByteArray>();
-        int count = 0;
         for (Text textValue : values) {
             ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 
0, textValue.getLength()));
             set.add(value);
-            count++;
-            if (count % 10000 == 0 && 
MemoryBudgetController.getSystemAvailMB() < 100) {
-                outputDistinctValues(col, set, context);
-                set.clear();
-            }
-        }
-
-        if (set.isEmpty() == false) {
-            outputDistinctValues(col, set, context);
         }
 
-    }
-
-    private void outputDistinctValues(TblColRef col, Set<ByteArray> set, 
Context context) throws IOException {
-        final Configuration conf = context.getConfiguration();
-        final FileSystem fs = FileSystem.get(conf);
-        final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, col.getName());
-        FSDataOutputStream out;
-        if (fs.exists(outputFile)) {
-            out = fs.append(outputFile);
-        } else {
-            out = fs.create(outputFile);
-        }
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        FSDataOutputStream out = fs.create(new Path(outputPath, 
col.getName()));
 
         try {
             for (ByteArray value : set) {
@@ -105,6 +87,7 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<ShortWritable, Text
         } finally {
             out.close();
         }
+
     }
 
 }

Reply via email to