[
https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15215213#comment-15215213
]
ASF GitHub Bot commented on MAHOUT-1817:
----------------------------------------
Github user smarthi commented on a diff in the pull request:
https://github.com/apache/mahout/pull/203#discussion_r57661303
--- Diff:
flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
---
@@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K:
ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
+ /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+ * the dataset to the filesystem and read it back when cache is called
*/
def cache() = {
if (!isCached) {
- cacheFileName = System.nanoTime().toString
+ cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
+ persist(ds, cacheFileName)
}
- implicit val typeInformation = createTypeInformation[(K,Vector)]
+ val _ds = readPersistedDataSet(cacheFileName, ds)
+
+ /** Leave the parallelism degree to be set the operators
+ * TODO: find out a way to set the parallelism degree based on the
+ * final drm after computation is actually triggered
+ *
+ * // We may want to look more closely at this:
+ * // since we've cached a drm, triggering a computation
+ * // it may not make sense to keep the same parallelism degree
+ * if (!(parallelismDeg == _ds.getParallelism)) {
+ * _ds.setParallelism(parallelismDeg).rebalance()
+ * }
+ *
+ */
--- End diff --
In light of MAHOUT-1819, wherein its been agreed that parallelism would
only be set inside MahoutFlinkContext for the ExecutionEnvironment, I think
this jira can be safely marked as 'Resolved'.
> Implement caching in Flink Bindings
> -----------------------------------
>
> Key: MAHOUT-1817
> URL: https://issues.apache.org/jira/browse/MAHOUT-1817
> Project: Mahout
> Issue Type: New Feature
> Components: Flink
> Affects Versions: 0.11.2
> Reporter: Andrew Palumbo
> Assignee: Andrew Palumbo
> Priority: Blocker
> Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark. We need
> find a way to honour the {{checkpoint()}} contract in Flink Bindings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)