This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 30eefe3 [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency 30eefe3 is described below commit 30eefe3d226e35bd7c9ed0f1c4c77c5b22809b76 Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Mon Jul 20 14:50:49 2020 +0530 [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency Why is this PR needed? When load and compaction are happening concurrently, in reliability test segment data will be deleted from SI table, which leads to exception/failures pre-priming was happening for SI table segment in case of compaction before making SI segment as a success. What changes were proposed in this PR? remove unnecessary cleaning API call from SI flow and before compaction success segment locks were getting released for SI, handle that do the code refactoring in case of SI load after main table compaction to handle proper pre-priming after segments were made success. Does this PR introduce any user interface change? No Is any new testcase added? No(tested in cluster with 10 concurrency and around 1000 loads) This closes #3854 --- .../org/apache/carbondata/core/index/Segment.java | 5 ++ .../spark/rdd/CarbonTableCompactor.scala | 15 +++++ .../events/CleanFilesPostEventListener.scala | 68 +++++++++++++++++++++- .../SILoadEventListenerForFailedSegments.scala | 23 +++++++- .../spark/sql/secondaryindex/load/Compactor.scala | 28 ++++++++- .../secondaryindex/rdd/SecondaryIndexCreator.scala | 59 +++++++++++-------- .../secondaryindex/util/SecondaryIndexUtil.scala | 4 +- .../processing/merger/CarbonDataMergerUtil.java | 6 +- 8 files changed, 176 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java b/core/src/main/java/org/apache/carbondata/core/index/Segment.java index 8fb22bc..202a7d4 100644 --- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java @@ -275,6 +275,11 @@ public class Segment implements Serializable, Writable { return null; } + public static Segment getSegment(String segmentNo, String segmentFileName, + ReadCommittedScope readCommittedScope) { + return new Segment(segmentNo, segmentFileName, readCommittedScope); + } + public Configuration getConfiguration() { return readCommittedScope.getConfiguration(); } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 83d8935..3187f14 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ListBuffer import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{InputSplit, Job} @@ -37,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.index.{IndexStoreManager, Segment} +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo @@ -93,6 +95,15 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val lastSegment = sortedSegments.get(sortedSegments.size() - 1) deletePartialLoadsInCompaction() val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge) + var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty + loadsToMerge.asScala.foreach { segmentId => + val segmentLock = CarbonLockFactory + .getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK) + segmentLock.lockWithRetries() + segmentLocks += segmentLock + } try { scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, compactedLoad) } catch { @@ -117,6 +128,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, Array(compactedLoadToClear)) } throw e + } finally { + segmentLocks.foreach { segmentLock => + segmentLock.unlock() + } } // scan again and determine if anything is there to merge again. diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala index fa1e666..97f7836 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala @@ -24,11 +24,18 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.index.CarbonIndexUtil import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener} class CleanFilesPostEventListener extends OperationEventListener with Logging { @@ -54,7 +61,66 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { SegmentStatusManager.deleteLoadsAndUpdateMetadata( indexTable, true, partitions.map(_.asJava).orNull) CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true) + cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) } } } + + /** + * This method added to clean the segments which are success in SI and may be compacted or marked + * for delete in main table, which can happen in case of concurrent scenarios. + */ + def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable, + mainTable: CarbonTable): Unit = { + val mainTableStatusLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(mainTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK) + val indexTableStatusLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK) + var mainTableLocked = false + var indexTableLocked = false + try { + mainTableLocked = mainTableStatusLock.lockWithRetries() + indexTableLocked = indexTableStatusLock.lockWithRetries() + if (mainTableLocked && indexTableLocked) { + val mainTableMetadataDetails = + SegmentStatusManager.readLoadMetadata(mainTable.getMetadataPath).toSet ++ + SegmentStatusManager.readLoadHistoryMetadata(mainTable.getMetadataPath).toSet + val indexTableMetadataDetails = + SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath).toSet + val segToStatusMap = mainTableMetadataDetails + .map(detail => detail.getLoadName -> detail.getSegmentStatus).toMap + + val unnecessarySegmentsOfSI = indexTableMetadataDetails.filter { indexDetail => + indexDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) && + segToStatusMap.contains(indexDetail.getLoadName) && + (segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.COMPACTED) || + segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.MARKED_FOR_DELETE)) + } + LOGGER.info(s"Unwanted SI segments are: $unnecessarySegmentsOfSI") + unnecessarySegmentsOfSI.foreach { detail => + val carbonFile = FileFactory + .getCarbonFile(CarbonTablePath + .getSegmentPath(indexTable.getTablePath, detail.getLoadName)) + CarbonUtil.deleteFoldersAndFiles(carbonFile) + } + unnecessarySegmentsOfSI.foreach { detail => + detail.setSegmentStatus(segToStatusMap(detail.getLoadName)) + detail.setVisibility("false") + } + + SegmentStatusManager.writeLoadDetailsIntoFile( + indexTable.getMetadataPath + CarbonTablePath.TABLE_STATUS_FILE, + unnecessarySegmentsOfSI.toArray) + } else { + LOGGER.error("Unable to get the lock file for main/Index table. Please try again later") + } + } catch { + case ex: Exception => + LOGGER.error("clean up of unwanted SI segments failed", ex) + // ignore the exception + } finally { + indexTableStatusLock.unlock() + mainTableStatusLock.unlock() + } + } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala index cae33ed..71c6559 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.secondaryindex.events import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import org.apache.log4j.Logger import org.apache.spark.internal.Logging @@ -33,7 +34,7 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.index.IndexType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} @@ -91,6 +92,7 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty if (!isLoadSIForFailedSegments || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( mainTblLoadMetadataDetails, @@ -166,8 +168,19 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName) - failedLoadMetadataDetails.add(detail(0)) + // in concurrent scenario, if a compaction is going on table, then SI + // segments are updated first in table status and then the main table + // segment, so in any load runs paralley this listener shouldn't consider + // those segments accidentally. So try to take the segment lock. + val segmentLockOfProbableOngngCompactionSeg = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + + LockUsage.LOCK) + if (segmentLockOfProbableOngngCompactionSeg.lockWithRetries()) { + segmentLocks += segmentLockOfProbableOngngCompactionSeg + LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName) + failedLoadMetadataDetails.add(detail(0)) + } } } }) @@ -221,6 +234,10 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L LOGGER.error(s"Load to SI table to $indexTableName is failed " + s"or SI table ENABLE is failed. ", ex) return + } finally { + segmentLocks.foreach { + segmentLock => segmentLock.unlock() + } } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala index 39b3d94..7f4ae00 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala @@ -19,19 +19,25 @@ package org.apache.spark.sql.secondaryindex.load import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.CarbonMergeFilesRDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.index.CarbonIndexUtil import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel} +import org.apache.spark.sql.secondaryindex.events.LoadTableSIPostExecutionEvent import org.apache.spark.sql.secondaryindex.rdd.SecondaryIndexCreator import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.ICarbonLock import org.apache.carbondata.core.metadata.index.IndexType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.events.OperationListenerBus +import org.apache.carbondata.indexserver.DistributedRDDUtils import org.apache.carbondata.processing.loading.model.CarbonLoadModel object Compactor { @@ -60,6 +66,7 @@ object Compactor { } else { java.util.Collections.emptyIterator() } + var allSegmentsLock: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer.empty while (iterator.hasNext) { val index = iterator.next() val indexColumns = index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList @@ -76,11 +83,12 @@ object Compactor { try { val segmentToSegmentTimestampMap: util.Map[String, String] = new java.util .HashMap[String, String]() - val indexCarbonTable = SecondaryIndexCreator + val (indexCarbonTable, segmentLocks, operationContext) = SecondaryIndexCreator .createSecondaryIndex(secondaryIndexModel, segmentToSegmentTimestampMap, null, forceAccessSegment, isCompactionCall = true, isLoadToFailedSISegments = false) + allSegmentsLock ++= segmentLocks CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus( indexCarbonTable, loadsToMerge, @@ -122,6 +130,19 @@ object Compactor { segmentIdToLoadStartTimeMapping(validSegments.head), SegmentStatus.SUCCESS, carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments.toList.asJava) + + // Index PrePriming for SI + DistributedRDDUtils.triggerPrepriming(sparkSession, indexCarbonTable, Seq(), + operationContext, FileFactory.getConfiguration, validSegments) + + val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent = + LoadTableSIPostExecutionEvent(sparkSession, + indexCarbonTable.getCarbonTableIdentifier, + secondaryIndexModel.carbonLoadModel, + indexCarbonTable) + OperationListenerBus.getInstance + .fireEvent(loadTableSIPostExecutionEvent, operationContext) + siCompactionIndexList ::= indexCarbonTable } catch { case ex: Exception => @@ -136,6 +157,11 @@ object Compactor { """.stripMargin).collect() } throw ex + } finally { + // once compaction is success, release the segment locks + allSegmentsLock.foreach { segmentLock => + segmentLock.unlock() + } } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala index 04cfbb0..e897051 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala @@ -59,7 +59,7 @@ object SecondaryIndexCreator { indexTable: CarbonTable, forceAccessSegment: Boolean = false, isCompactionCall: Boolean, - isLoadToFailedSISegments: Boolean): CarbonTable = { + isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = { var indexCarbonTable = indexTable val sc = secondaryIndexModel.sqlContext // get the thread pool size for secondary index creation @@ -76,7 +76,7 @@ object SecondaryIndexCreator { indexCarbonTable = metastore .lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName), secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext - .sparkSession).asInstanceOf[CarbonRelation].carbonTable + .sparkSession).carbonTable } val operationContext = new OperationContext @@ -119,6 +119,9 @@ object SecondaryIndexCreator { } validSegmentList = validSegments.asScala.toList + if (validSegmentList.isEmpty) { + return (indexCarbonTable, segmentLocks, operationContext) + } LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " + s"for segments: $validSegmentList") @@ -266,13 +269,15 @@ object SecondaryIndexCreator { rebuiltSegments) } - // Index PrePriming for SI - DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession, - indexCarbonTable, - Seq(), - operationContext, - FileFactory.getConfiguration, - validSegments.asScala.toList) + if (!isCompactionCall) { + // Index PrePriming for SI + DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession, + indexCarbonTable, + Seq(), + operationContext, + FileFactory.getConfiguration, + validSegments.asScala.toList) + } // update the status of all the segments to marked for delete if data load fails, so that // next load which is triggered for SI table in post event of main table data load clears @@ -294,17 +299,24 @@ object SecondaryIndexCreator { LOGGER.error("Dataload to secondary index creation has failed") } - val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent = - LoadTableSIPostExecutionEvent(sc.sparkSession, - indexCarbonTable.getCarbonTableIdentifier, - secondaryIndexModel.carbonLoadModel, - indexCarbonTable) - OperationListenerBus.getInstance - .fireEvent(loadTableSIPostExecutionEvent, operationContext) + if (!isCompactionCall) { + val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent = + LoadTableSIPostExecutionEvent(sc.sparkSession, + indexCarbonTable.getCarbonTableIdentifier, + secondaryIndexModel.carbonLoadModel, + indexCarbonTable) + OperationListenerBus.getInstance + .fireEvent(loadTableSIPostExecutionEvent, operationContext) + } - indexCarbonTable + if (isCompactionCall) { + (indexCarbonTable, segmentLocks, operationContext) + } else { + (indexCarbonTable, ListBuffer.empty, operationContext) + } } catch { case ex: Exception => + LOGGER.error("Load to SI table failed", ex) FileInternalUtil .updateTableStatus(validSegmentList, secondaryIndexModel.carbonLoadModel.getDatabaseName, @@ -316,13 +328,8 @@ object SecondaryIndexCreator { String](), indexCarbonTable, sc.sparkSession) - LOGGER.error(ex) throw ex } finally { - // release the segment locks - segmentLocks.foreach(segmentLock => { - segmentLock.unlock() - }) // if some segments are skipped, disable the SI table so that // SILoadEventListenerForFailedSegments will take care to load to these segments in next // consecutive load to main table. @@ -339,7 +346,6 @@ object SecondaryIndexCreator { if (!isCompactionCall) { SegmentStatusManager .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null) - TableProcessingOperations.deletePartialLoadDataIfExist(indexCarbonTable, false) } } catch { case e: Exception => @@ -351,6 +357,13 @@ object SecondaryIndexCreator { if (null != executorService) { executorService.shutdownNow() } + + // release the segment locks only for load flow + if (!isCompactionCall) { + segmentLocks.foreach(segmentLock => { + segmentLock.unlock() + }) + } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala index 498c739..034cd1c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala @@ -199,9 +199,11 @@ object SecondaryIndexUtil { } if (finalMergeStatus) { if (null != mergeStatus && mergeStatus.length != 0) { + val validSegmentsToUse = validSegments.asScala + .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo)) deleteOldIndexOrMergeIndexFiles( carbonLoadModel.getFactTimeStamp, - validSegments, + validSegmentsToUse.toList.asJava, indexCarbonTable) mergedSegments.asScala.map { seg => val file = SegmentFileStore.writeSegmentFile( diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 253a78b..da70164 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -933,10 +933,10 @@ public final class CarbonDataMergerUtil { for (LoadMetadataDetails segment : loadMetadataDetails) { //check if this load is an already merged load. if (null != segment.getMergedLoadName()) { - - segments.add(Segment.toSegment(segment.getMergedLoadName(), null)); + segments + .add(Segment.getSegment(segment.getMergedLoadName(), segment.getSegmentFile(), null)); } else { - segments.add(Segment.toSegment(segment.getLoadName(), null)); + segments.add(Segment.getSegment(segment.getLoadName(), segment.getSegmentFile(), null)); } } return segments;