Repository: incubator-gobblin Updated Branches: refs/heads/master cd5222775 -> 4b5f55d08
[GOBBLIN-558] submitting tracking events during flow compilation and orchestration in GaaS Closes #2420 from arjun4084346/trackingEventsInGaaS Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4b5f55d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4b5f55d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4b5f55d0 Branch: refs/heads/master Commit: 4b5f55d08742ce4c27e216849f97fbef9da9ac59 Parents: cd52227 Author: Arjun <[email protected]> Authored: Mon Aug 13 16:28:41 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Aug 13 16:28:41 2018 -0700 ---------------------------------------------------------------------- .../cluster/GobblinClusterMetricTagNames.java | 9 +-- .../cluster/GobblinHelixJobLauncher.java | 12 ++-- .../gobblin/metrics/event/TimingEvent.java | 15 ++++ .../flow/IdentityFlowToJobSpecCompiler.java | 3 +- .../modules/flow/MockedSpecCompiler.java | 75 ++++++++++++++++++++ .../service/modules/flow/SpecCompiler.java | 2 + .../modules/orchestration/Orchestrator.java | 65 ++++++++++++++++- 7 files changed, 165 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java index 34c865c..ccf1f14 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java @@ -18,10 +18,12 @@ package org.apache.gobblin.cluster; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.event.TimingEvent; /** * A central place for constants of {@link org.apache.gobblin.metrics.MetricContext} tag names for a Gobblin cluster. + * Some shared constants have been moved to {@link TimingEvent.FlowEventConstants}. * * @author Yinan Li */ @@ -32,11 +34,4 @@ public class GobblinClusterMetricTagNames { public static final String APPLICATION_ID = "application.id"; public static final String HELIX_INSTANCE_NAME = "helix.instance.name"; public static final String TASK_RUNNER_ID = "task.runner.id"; - - public static final String FLOW_GROUP = "flowGroup"; - public static final String FLOW_NAME = "flowName"; - public static final String FLOW_EXECUTION_ID = "flowExecutionId"; - public static final String JOB_GROUP = "jobGroup"; - public static final String JOB_NAME = "jobName"; - public static final String JOB_EXECUTION_ID = "jobExecutionId"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index cd37342..e2447a5 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -433,21 +433,21 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { // only inject flow tags if a flow name is defined if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) { - metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.FLOW_GROUP, + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, ""))); metadataTags.add( - new Tag<>(GobblinClusterMetricTagNames.FLOW_NAME, jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY))); + new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY))); // use job execution id if flow execution id is not present - metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.FLOW_EXECUTION_ID, + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId))); } - metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_GROUP, + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, ""))); - metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_NAME, + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, ""))); - metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_EXECUTION_ID, jobExecutionId)); + metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId)); LOGGER.debug("GobblinHelixJobLauncher.addAdditionalMetadataTags: metadataTags {}", metadataTags); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index a7ad821..b00465f 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -31,6 +31,7 @@ public class TimingEvent { public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer"; public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer"; public static final String WORK_UNITS_PREPARATION = "WorkUnitsPreparationTimer"; + public static final String JOB_ORCHESTRATED = "JobOrchestrated"; public static final String JOB_PREPARE = "JobPrepareTimer"; public static final String JOB_START = "JobStartTimer"; public static final String JOB_RUN = "JobRunTimer"; @@ -53,6 +54,20 @@ public class TimingEvent { public static final String HELIX_JOB_RUN = "JobHelixRunTimer"; } + public static class FlowTimings { + public static final String FLOW_COMPILED = "FlowCompiled"; + } + + public static class FlowEventConstants { + public static final String FLOW_NAME_FIELD = "flowName"; + public static final String FLOW_GROUP_FIELD = "flowGroup"; + public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId"; + public static final String JOB_NAME_FIELD = "jobName"; + public static final String JOB_GROUP_FIELD = "jobGroup"; + public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId"; + public static final String SPEC_EXECUTOR_FIELD = "specExecutor"; + } + public static final String METADATA_START_TIME = "startTime"; public static final String METADATA_END_TIME = "endTime"; public static final String METADATA_DURATION = "durationMillis"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index fa843c5..42c4926 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -96,8 +96,7 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); for (TopologySpec topologySpec : topologySpecMap.values()) { - Map<ServiceNode, ServiceNode> capabilities = - (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get(); + Map<ServiceNode, ServiceNode> capabilities = topologySpec.getSpecExecutor().getCapabilities().get(); for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) { log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with " + "capability of source: %s and destination: %s ", jobSpec.getUri(), topologySpec.getUri(), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java new file mode 100644 index 0000000..7a9e5db --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * This mocked SpecCompiler class creates 3 dummy job specs to emulate multi hop flow spec compiler. + * It uses {@link InMemorySpecExecutor} for these dummy specs. + */ +public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler { + + private static final int NUMBER_OF_JOBS = 3; + + public MockedSpecCompiler(Config config) { + super(config); + } + + @Override + public Dag<JobExecutionPlan> compileFlow(Spec spec) { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + + long flowExecutionId = System.currentTimeMillis(); + + int i = 0; + while(i++ < NUMBER_OF_JOBS) { + String specUri = "/foo/bar/spec/" + i; + Properties properties = new Properties(); + properties.put(ConfigurationKeys.FLOW_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY)); + properties.put(ConfigurationKeys.FLOW_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY)); + properties.put(ConfigurationKeys.JOB_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY) + "_" + i); + properties.put(ConfigurationKeys.JOB_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY) + "_" + i); + properties.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + JobSpec jobSpec = JobSpec.builder(specUri) + .withConfig(ConfigUtils.propertiesToConfig(properties)) + .withVersion("1") + .withDescription("Spec Description") + .build(); + jobExecutionPlans.add(new JobExecutionPlan(jobSpec, new InMemorySpecExecutor(ConfigFactory.empty()))); + } + + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java index 3ce3b70..53cdf83 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java @@ -37,6 +37,8 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable { /*** * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s * and the mapping to {@link SpecExecutor} that they can be run on. + * All the specs generated from the compileFlow must have a + * {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOW_EXECUTION_ID_KEY} * @param spec {@link Spec} to compile. * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 4959b1a..1f7f737 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Maps; import javax.annotation.Nonnull; import com.codahale.metrics.Meter; @@ -33,12 +35,16 @@ import com.google.common.base.Optional; import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.service.modules.flow.SpecCompiler; +import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; @@ -71,6 +77,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final Optional<TopologyCatalog> topologyCatalog; protected final MetricContext metricContext; + protected final Optional<EventSubmitter> eventSubmitter; @Getter private Optional<Meter> flowOrchestrationSuccessFulMeter; @Getter @@ -86,13 +93,15 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER)); this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); - this.flowOrchestrationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); + this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); + this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); } else { this.metricContext = null; this.flowOrchestrationSuccessFulMeter = Optional.absent(); this.flowOrchestrationFailedMeter = Optional.absent(); this.flowOrchestrationTimer = Optional.absent(); + this.eventSubmitter = Optional.absent(); } this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); @@ -188,13 +197,25 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { + Map<String, String> flowMetadata = getFlowMetadata((FlowSpec) spec); + TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent() + ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED) + : null; Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec); if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { + Instrumented.markMeter(this.flowOrchestrationFailedMeter); _log.warn("Cannot determine an executor to run on for Spec: " + spec); return; } + flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, + jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + + if (flowCompilationTimer != null) { + flowCompilationTimer.stop(flowMetadata); + } + // Schedule all compiled JobSpecs on their respective Executor for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) { JobExecutionPlan jobExecutionPlan = dagNode.getValue(); @@ -205,8 +226,22 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); Spec jobSpec = jobExecutionPlan.getJobSpec(); + if (!((JobSpec)jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + _log.warn("JobSpec does not contain flowExecutionId."); + } + + Map<String, String> jobMetadata = getJobMetadata(flowMetadata, jobExecutionPlan); _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); + + TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() + ? this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) + : null; + producer.addSpec(jobSpec); + + if (jobOrchestrationTimer != null) { + jobOrchestrationTimer.stop(jobMetadata); + } } catch(Exception e) { _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + " for flow: " + spec, e); @@ -220,6 +255,34 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } + private Map<String,String> getFlowMetadata(FlowSpec flowSpec) { + Map<String, String> metadata = Maps.newHashMap(); + + metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); + metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); + if (metadata.containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)) { + metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + } + + return metadata; + } + + private Map<String,String> getJobMetadata(Map<String,String> flowMetadata, JobExecutionPlan jobExecutionPlan) { + Map<String, String> jobMetadata = Maps.newHashMap(); + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); + + jobMetadata.putAll(flowMetadata); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName()); + + return jobMetadata; + } + public void remove(Spec spec, Properties headers) { // TODO: Evolve logic to cache and reuse previously compiled JobSpecs // .. this will work for Identity compiler but not always for multi-hop.
