mosche commented on code in PR #24009: URL: https://github.com/apache/beam/pull/24009#discussion_r1026478459
########## runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming.translation; + +import java.util.Collection; +import java.util.concurrent.Callable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ExplainMode; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link EvaluationContext} is the result of a pipeline {@link PipelineTranslator#translate + * translation} and can be used to evaluate / run the pipeline. + * + * <p>However, in some cases pipeline translation involves the early evaluation of some parts of the + * pipeline. For example, this is necessary to materialize side-inputs. The {@link + * EvaluationContext} won't re-evaluate such datasets. + */ +@Internal +public final class EvaluationContext { + private static final Logger LOG = LoggerFactory.getLogger(EvaluationContext.class); + + interface NamedDataset<T> { + String name(); + + @Nullable + Dataset<WindowedValue<T>> dataset(); + } + + private final Collection<? extends NamedDataset<?>> leaveDatasets; + private final SparkSession session; + + EvaluationContext(Collection<? extends NamedDataset<?>> leaveDatasets, SparkSession session) { + this.leaveDatasets = leaveDatasets; + this.session = session; + } + + /** Trigger evaluation of all leave datasets. */ + public void evaluate() { + for (NamedDataset<?> ds : leaveDatasets) { + final Dataset<?> dataset = ds.dataset(); + if (dataset == null) { + continue; + } + if (LOG.isDebugEnabled()) { + ExplainMode explainMode = ExplainMode.fromString("simple"); + String execPlan = dataset.queryExecution().explainString(explainMode); + LOG.debug("Evaluating dataset {}:\n{}", ds.name(), execPlan); + } + // force evaluation using a dummy foreach action + evaluate(ds.name(), () -> dataset.foreach(NOOP)); + } + } + + /** + * The purpose of this utility is to mark the evaluation of Spark actions, both during Pipeline + * translation, when evaluation is required, and when finally evaluating the pipeline. + */ + public static void evaluate(String name, Runnable action) { + long startMs = System.currentTimeMillis(); + try { + action.run(); + LOG.info("Evaluated dataset {} in {}", name, durationSince(startMs)); + } catch (RuntimeException e) { + LOG.error("Failed to evaluate dataset {}: {}", name, Throwables.getRootCause(e).getMessage()); + throw new RuntimeException(e); + } + } + + /** + * The purpose of this utility is to mark the evaluation of Spark actions, both during Pipeline + * translation, when evaluation is required, and when finally evaluating the pipeline. + */ + public static <T> T evaluate(String name, Callable<T> action) { Review Comment: It was used here https://github.com/apache/beam/commit/dd6d0781c7b0321999168586da058d8b66d0b138#diff-d7904f7a21f6d5b3aeccce61863bcca39db112696d972762deca5b4623c87da6L229-L230 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
