This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 19232aba8f01b42360199c481ed08856f4e65522 Author: Jiale He <35652389+jial...@users.noreply.github.com> AuthorDate: Thu Sep 29 15:31:01 2022 +0800 KYLIN-5319 Earlier Init Segment LayoutInfo In FilePruner --- .../kylin/metadata/cube/model/NDataSegment.java | 4 +- .../kylin/metadata/cube/model/NDataflow.java | 17 +++- .../metadata/cube/model/NDataflowManager.java | 57 ++++++++++---- .../kylin/metadata/cube/model/NDataflowTest.java | 91 ++++++++++++++++++++-- .../sql/execution/datasource/FilePruner.scala | 21 ++--- .../sql/execution/datasource/FilePrunerSuite.scala | 10 +-- 6 files changed, 161 insertions(+), 39 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java index 9f9c3dfec3..a3abc92b37 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java @@ -34,12 +34,12 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.model.TimeRange; -import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.util.MultiPartitionUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -303,7 +303,7 @@ public class NDataSegment implements ISegment, Serializable { return getLayoutInfo().isAlreadyBuilt(layoutId); } - private LayoutInfo getLayoutInfo() { + public LayoutInfo getLayoutInfo() { if (layoutInfo == null) { synchronized (this) { if (layoutInfo == null) { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java index 37936ba4c0..58e3c3abb2 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java @@ -36,9 +36,13 @@ import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.MissingRootPersistentEntity; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.cube.optimization.FrequencyMap; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; @@ -48,10 +52,6 @@ import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.cube.optimization.FrequencyMap; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.NDataModelManager; -import org.apache.kylin.metadata.model.NTableMetadataManager; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -585,4 +585,13 @@ public class NDataflow extends RootPersistentEntity implements Serializable, IRe public boolean hasReadySegments() { return isReady() && CollectionUtils.isNotEmpty(getQueryableSegments()); } + + public void initAllSegLayoutInfo() { + getSegments().forEach(NDataSegment::getLayoutInfo); + } + + public void initSegLayoutInfoById(Set<String> segmentIdList) { + getSegments(segmentIdList).forEach(NDataSegment::getLayoutInfo); + } + } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java index 53eee8b63c..f22dd1838d 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java @@ -22,8 +22,8 @@ import static java.util.stream.Collectors.groupingBy; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_INDEX_ILLEGAL; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_PARTITION_ILLEGAL; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS; -import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE; import static org.apache.kylin.common.util.SegmentMergeStorageChecker.checkMergeSegmentThreshold; +import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE; import java.util.ArrayList; import java.util.Arrays; @@ -41,28 +41,28 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.metadata.cachesync.CachedCrudAssist; +import org.apache.kylin.metadata.model.ManagementType; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TimeRange; +import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.IRealizationProvider; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.metadata.model.ManagementType; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.NDataModelManager; -import org.apache.kylin.metadata.model.NTableMetadataManager; -import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker; -import org.apache.kylin.metadata.project.NProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -258,10 +258,7 @@ public class NDataflowManager implements IRealizationProvider { } public NDataflow getDataflow(String id) { - if (StringUtils.isEmpty(id)) { - return null; - } - return crud.get(id); + return getDataflow(id, false); } public NDataflow getDataflowByModelAlias(String name) { @@ -874,4 +871,38 @@ public class NDataflowManager implements IRealizationProvider { return offlineManually || isOfflineMultiPartitionModel || isOfflineScdModel; } + /** + * get dataflow choose whether init all Segment LayoutInfo. + * Segment LayoutInfo is lazy load, It can be loaded immediately if needed. + */ + public NDataflow getDataflow(String id, boolean loadSegLayoutInfo) { + if (StringUtils.isEmpty(id)) { + return null; + } + NDataflow dataflow = crud.get(id); + if (!loadSegLayoutInfo) { + return dataflow; + } + dataflow.initAllSegLayoutInfo(); + return dataflow; + } + + /** + * get dataflow and init specified Segment LayoutInfo. + */ + public NDataflow getDataflow(String id, Set<String> segmentIds) { + if (StringUtils.isEmpty(id)) { + return null; + } + NDataflow dataflow = getDataflow(id, false); + if (CollectionUtils.isEmpty(segmentIds)) { + return dataflow; + } + if (Objects.isNull(dataflow)) { + return null; + } + dataflow.initSegLayoutInfoById(segmentIds); + return dataflow; + } + } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java index f962c863a7..983d70c94b 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java @@ -20,19 +20,22 @@ package org.apache.kylin.metadata.cube.model; import java.io.IOException; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.test.util.ReflectionTestUtils; +import io.kyligence.kap.guava20.shaded.common.collect.Sets; import lombok.val; import lombok.var; public class NDataflowTest extends NLocalFileMetadataTestCase { - private String projectDefault = "default"; + private final String projectDefault = "default"; + private final String projectStreaming = "streaming_test"; @Before public void setUp() throws Exception { @@ -76,9 +79,8 @@ public class NDataflowTest extends NLocalFileMetadataTestCase { Assert.assertEquals(indexPlanConfig.base(), config.base()); Assert.assertEquals(2, config.getExtendedOverrides().size()); - indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> { - copyForWrite.getOverrideProps().put("test", "test"); - }); + indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", + copyForWrite -> copyForWrite.getOverrideProps().put("test", "test")); config = df.getConfig(); Assert.assertEquals(indexPlanConfig.base(), config.base()); @@ -103,7 +105,7 @@ public class NDataflowTest extends NLocalFileMetadataTestCase { @Test public void testCollectPrecalculationResource_Streaming() { - val dsMgr = NDataflowManager.getInstance(getTestConfig(), "streaming_test"); + val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming); val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d"); val strings = df.collectPrecalculationResource(); Assert.assertEquals(7, strings.size()); @@ -122,4 +124,81 @@ public class NDataflowTest extends NLocalFileMetadataTestCase { Assert.assertTrue( strings.stream().anyMatch(path -> path.startsWith("/streaming_test/kafka/DEFAULT.SSB_STREAMING.json"))); } + + @Test + public void testGetDataflow() { + val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming); + { + val df = dsMgr.getDataflow(null); + Assert.assertNull(df); + } + + { + val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d"); + Assert.assertNotNull(df); + } + + { + val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d-AAA", Sets.newHashSet("1")); + Assert.assertNull(df); + } + + { + val df = dsMgr.getDataflow(null, Sets.newHashSet("1")); + Assert.assertNull(df); + } + } + + @Test + public void testLazyLoadSegmentDetail() { + val fieldName = "layoutInfo"; + val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming); + val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d"); + + df.getSegments().forEach(segment -> { + // lazy init Segment LayoutInfo, it is null + Object layoutInfoBefore = ReflectionTestUtils.getField(segment, fieldName); + Assert.assertNull(layoutInfoBefore); + + // init Segment LayoutInfo, it is not null + segment.getLayoutInfo(); + Object layoutInfoAfter = ReflectionTestUtils.getField(segment, fieldName); + Assert.assertNotNull(layoutInfoAfter); + }); + } + + @Test + public void testLoadSegmentDetail() { + val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming); + // init Segment LayoutInfo right now + val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d", true); + df.getSegments().forEach(segment -> { + val layoutInfoAfter = ReflectionTestUtils.getField(segment, "layoutInfo"); + Assert.assertNotNull(layoutInfoAfter); + }); + } + + @Test + public void testLoadSpecifiedSegmentDetail() { + val dataflowId = "4965c827-fbb4-4ea1-a744-3f341a3b030d"; + val segmentId = "3e560d22-b749-48c3-9f64-d4230207f120"; + val fieldName = "layoutInfo"; + + val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming); + { + val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet()); + val segment = df.getSegment(segmentId); + val layoutInfo = ReflectionTestUtils.getField(segment, fieldName); + Assert.assertNull(layoutInfo); + } + + { + // init Specified Segment LayoutInfo + val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet(segmentId)); + val segmentAfter = df.getSegment(segmentId); + val layoutInfo = ReflectionTestUtils.getField(segmentAfter, fieldName); + Assert.assertNotNull(layoutInfo); + } + } + } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 9937f3628b..8e8c046410 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasource +import io.kyligence.kap.guava20.shaded.common.collect.Sets + import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.kylin.common.exception.TargetSegmentNotFoundException import org.apache.kylin.common.util.{DateFormat, HadoopUtil} @@ -39,6 +41,7 @@ import org.apache.spark.util.collection.BitSet import java.sql.{Date, Timestamp} import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable case class SegmentDirectory(segmentID: String, partitions: List[Long], files: Seq[FileStatus]) @@ -79,9 +82,11 @@ class FilePruner(val session: SparkSession, private val dataflow: NDataflow = { val dataflowId = options.getOrElse("dataflowId", sys.error("dataflowId option is required")) val prj = options.getOrElse("project", sys.error("project option is required")) + val prunedSegmentIds = Sets.newHashSet(prunedSegmentDirs.map(_.segmentID).asJavaCollection) val dfMgr = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, prj) - val dataflow = dfMgr.getDataflow(dataflowId) - FilePruner.checkSegmentStatus(prunedSegmentDirs, dataflow) + // init pruned Segment LayoutInfo immediately + val dataflow = dfMgr.getDataflow(dataflowId, prunedSegmentIds) + FilePruner.checkSegmentStatus(prunedSegmentIds, dataflow) dataflow } @@ -546,14 +551,12 @@ object FilePruner { } } - def checkSegmentStatus(segDirs: Seq[SegmentDirectory], dataflow: NDataflow): Unit = { + def checkSegmentStatus(prunedSegmentIds: util.HashSet[String], dataflow: NDataflow): Unit = { // check whether each segment id corresponds to the segment in NDataflow - val candidateSegIds = new util.HashSet[String] - segDirs.foreach(seg => candidateSegIds.add(seg.segmentID)) - val filterSegmentIds = dataflow.getSegments(candidateSegIds).asScala.map(e => e.getId).toSet - if(candidateSegIds.size != filterSegmentIds.size) { - val missSegId = new StringBuilder - candidateSegIds.asScala.foreach(e => { + val filterSegmentIds = dataflow.getSegments(prunedSegmentIds).asScala.map(e => e.getId).toSet + if (prunedSegmentIds.size != filterSegmentIds.size) { + val missSegId = new mutable.StringBuilder + prunedSegmentIds.asScala.foreach(e => { if (!filterSegmentIds.contains(e)) { missSegId.append(e).append(";") } diff --git a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala index 1bc6191e65..240bd85b86 100644 --- a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala +++ b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.execution.datasource +import io.kyligence.kap.guava20.shaded.common.collect.Sets import org.apache.kylin.common.exception.TargetSegmentNotFoundException import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow} import org.apache.kylin.metadata.model.{SegmentStatusEnum, Segments} import org.apache.spark.sql.common.SparderBaseFunSuite +import scala.collection.JavaConverters._ + class FilePrunerSuite extends SparderBaseFunSuite { test("KE-37730: test check segment status") { @@ -37,12 +40,9 @@ class FilePrunerSuite extends SparderBaseFunSuite { val segDir1 = SegmentDirectory("1", List.empty[Long], null) val segDir2 = SegmentDirectory("2", List.empty[Long], null) - val segDirSeq1 = Seq(segDir1) - FilePruner.checkSegmentStatus(segDirSeq1, mockDataFlow) - - val segDirSeq2 = Seq(segDir1, segDir2) + FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1).map(_.segmentID).asJavaCollection), mockDataFlow) val catchEx = intercept[TargetSegmentNotFoundException] { - FilePruner.checkSegmentStatus(segDirSeq2, mockDataFlow) + FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1, segDir2).map(_.segmentID).asJavaCollection), mockDataFlow) } assert(catchEx.getMessage.equals("Cannot find target segment, and missing segment id: 2;")) }