KYLIN-2783 fix serialization
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1cae1f5b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1cae1f5b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1cae1f5b Branch: refs/heads/2.1.x Commit: 1cae1f5b7ab879f3ec0735e9d176a142556e4d23 Parents: cfda465 Author: Li Yang <liy...@apache.org> Authored: Fri Aug 11 10:47:04 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Aug 11 12:17:54 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/cube/cuboid/DefaultCuboidScheduler.java | 5 +---- .../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 10 +++------- 2 files changed, 4 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1cae1f5b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java index 859ad20..b75acd5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java @@ -18,8 +18,6 @@ package org.apache.kylin.cube.cuboid; -import java.io.Serializable; - /** */ @@ -45,8 +43,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -@SuppressWarnings("serial") -public class DefaultCuboidScheduler extends CuboidScheduler implements Serializable { +public class DefaultCuboidScheduler extends CuboidScheduler { private final long max; private final Set<Long> allCuboidIds; private final Map<Long, List<Long>> parent2child; http://git-wip-us.apache.org/repos/asf/kylin/blob/1cae1f5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 2f54e25..544e072 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -44,7 +44,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; @@ -169,7 +168,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); - final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(vCubeDesc.getValue().getCuboidScheduler()); final int measureNum = cubeDesc.getMeasures().size(); int countMeasureIndex = 0; @@ -259,7 +257,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite); // aggregate to ND cuboids - PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); + PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), ndCuboidBuilder); for (level = 1; level <= totalLevels; level++) { partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); @@ -342,15 +340,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa CubeSegment cubeSegment; CubeDesc cubeDesc; - CuboidScheduler cuboidScheduler; NDCuboidBuilder ndCuboidBuilder; RowKeySplitter rowKeySplitter; transient boolean initialized = false; - CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { + CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, NDCuboidBuilder ndCuboidBuilder) { this.cubeSegment = cubeSegment; this.cubeDesc = cubeDesc; - this.cuboidScheduler = cuboidScheduler; this.ndCuboidBuilder = ndCuboidBuilder; this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @@ -366,7 +362,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa long cuboidId = rowKeySplitter.split(key); Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); - Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); + Collection<Long> myChildren = cubeDesc.getCuboidScheduler().getSpanningCuboid(cuboidId); // if still empty or null if (myChildren == null || myChildren.size() == 0) {