scwhittle commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1592106130
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -797,9 +806,30 @@ private StreamingDataflowWorker makeWorker(
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs) {
+ return makeWorker(
+ ImmutableMap.of(),
+ instructions,
+ options,
+ publishCounters,
+ clock,
+ executorSupplier,
+ localRetryTimeoutMs);
+ }
+
+ private StreamingDataflowWorker makeWorker(
Review Comment:
there are 6 makeWorkers, it seems some autovalue builder in this test would
make it more readable and less duplication of the default for various params.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -102,49 +112,112 @@ public static StreamingEngineComputationConfigFetcher
forTesting(
hasReceivedGlobalConfig,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
- executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME),
onStreamingConfig);
}
- private static BackOff defaultConfigBackoff() {
- return FluentBackoff.DEFAULT
- .withInitialBackoff(Duration.millis(100))
- .withMaxBackoff(Duration.standardMinutes(1))
- .withMaxCumulativeBackoff(Duration.standardMinutes(5))
- .backoff();
- }
-
- private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ @VisibleForTesting
+ static MapTask createMapTask(StreamingComputationConfig computationConfig) {
return new MapTask()
.setSystemName(computationConfig.getSystemName())
.setStageName(computationConfig.getStageName())
.setInstructions(computationConfig.getInstructions());
}
+ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
+ ThrowingFetchWorkItemFn fetchWorkItemFn) {
+ BackOff backoff = BACKOFF_FACTORY.backoff();
+ while (true) {
+ try {
+ return fetchWorkItemFn
+ .fetchWorkItem()
+ .map(
+ workItem -> {
+ StreamingConfigTask config =
workItem.getStreamingConfigTask();
+ Preconditions.checkState(
+ config != null,
+ "Received invalid WorkItem without StreamingConfigTask.
WorkItem={}",
+ workItem);
+ return config;
+ });
+ } catch (IllegalArgumentException | IOException e) {
+ LOG.warn("Error fetching config: ", e);
+ try {
+ if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+ return Optional.empty();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error backing off, will not retry: ", ioe);
+ return Optional.empty();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return Optional.empty();
+ }
+ }
+ }
+ }
+
+ private static Optional<StreamingEnginePipelineConfig> createPipelineConfig(
Review Comment:
don't think this needs to return optional
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
Review Comment:
nit: rm blank line
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toMap;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingApplianceComputationConfigFetcherTest {
+ private final WindmillServerStub mockWindmillServer =
mock(WindmillServerStub.class);
+
+ @Test
+ public void testGetComputationConfig() throws IOException {
+ List<Windmill.GetConfigResponse.NameMapEntry> nameMapEntries =
+ ImmutableList.of(
+ Windmill.GetConfigResponse.NameMapEntry.newBuilder()
+ .setUserName("userName1")
+ .setSystemName("systemName1")
+ .build(),
+ Windmill.GetConfigResponse.NameMapEntry.newBuilder()
+ .setUserName("userName2")
+ .setSystemName("userName2")
+ .build());
+ String serializedMapTask =
+ Transport.getJsonFactory()
+ .toString(
+ new MapTask()
+ .setSystemName("systemName")
+ .setStageName("stageName")
+ .setInstructions(ImmutableList.of()));
+ Windmill.GetConfigResponse getConfigResponse =
+ Windmill.GetConfigResponse.newBuilder()
+ .addAllNameMap(nameMapEntries)
+ .addComputationConfigMap(
+
Windmill.GetConfigResponse.ComputationConfigMapEntry.newBuilder()
+ .setComputationId("systemName")
+ .setComputationConfig(
+ Windmill.ComputationConfig.newBuilder()
+ .addTransformUserNameToStateFamily(
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ .newBuilder()
+ .setStateFamily("stateFamilyName")
+ .setTransformUserName("transformUserName")
+ .build())
+ .build())
+ .build())
+ .addCloudWorks(serializedMapTask)
+ .build();
+ ComputationConfig expectedConfig =
+ ComputationConfig.create(
+ Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class),
+ getConfigResponse.getComputationConfigMapList().stream()
+
.map(Windmill.GetConfigResponse.ComputationConfigMapEntry::getComputationConfig)
+ .flatMap(
+ computationConfig ->
+
computationConfig.getTransformUserNameToStateFamilyList().stream())
+ .collect(
+ toMap(
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ ::getTransformUserName,
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ ::getStateFamily)),
+ nameMapEntries.stream()
+ .collect(
+ toMap(
+ Windmill.GetConfigResponse.NameMapEntry::getUserName,
+
Windmill.GetConfigResponse.NameMapEntry::getSystemName)));
+ StreamingApplianceComputationConfigFetcher configLoader =
+ createStreamingApplianceConfigLoader();
+ when(mockWindmillServer.getConfig(any())).thenReturn(getConfigResponse);
+ Optional<ComputationConfig> config =
configLoader.fetchConfig("someComputationId");
+ assertTrue(config.isPresent());
+ assertThat(config.get()).isEqualTo(expectedConfig);
+ }
+
+ @Test
+ public void testGetComputationConfig_whenNoConfigReturned() {
+ StreamingApplianceComputationConfigFetcher configLoader =
+ createStreamingApplianceConfigLoader();
+ when(mockWindmillServer.getConfig(any())).thenReturn(null);
+ Optional<ComputationConfig> configResponse =
configLoader.fetchConfig("someComputationId");
+ assertFalse(configResponse.isPresent());
+ }
+
Review Comment:
add test that empty computation throws exception
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
+ boolean waitForInitialConfig,
+ long globalConfigRefreshPeriod,
+ Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ return StreamingEngineComputationConfigFetcher.forTesting(
+ !waitForInitialConfig,
+ globalConfigRefreshPeriod,
+ mockDataflowServiceClient,
+ ignored -> Executors.newSingleThreadScheduledExecutor(),
+ onPipelineConfig);
+ }
+
+ @After
+ public void cleanUp() {
+ streamingEngineConfigFetcher.stop();
+ }
+
+ @Test
+ public void testStart_requiresInitialConfig() throws IOException,
InterruptedException {
+ WorkItem initialConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(initialConfig));
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ 0,
+ config -> {
+ try {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ waitForInitialConfig.countDown();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
+ .containsExactly(
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ }
+
+ @Test
+ public void testStart_startsPeriodicConfigRequests() throws IOException,
InterruptedException {
+ WorkItem firstConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ WorkItem secondConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(15L));
+ WorkItem thirdConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
+ CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(firstConfig))
+ .thenReturn(Optional.of(secondConfig))
+ .thenReturn(Optional.of(thirdConfig));
+
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ Duration.millis(100).getMillis(),
+ config -> {
+ receivedPipelineConfig.add(config);
+ numExpectedRefreshes.countDown();
+ });
+
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ numExpectedRefreshes.await();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
Review Comment:
seems safer to stop the fetcher's background thread before accessing
received config (or use a concurrent set)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -54,6 +56,7 @@ public final class ComputationStateCache implements
StatusDataProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ComputationStateCache.class);
+ private final ConcurrentMap<String, String>
globalUsernameToStateFamilyNameMap;
Review Comment:
nit: global has connotations and makes it sound like static global, how
about pipelineUserNameToStateFamilyNameMap
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCachedOrFetchable() {
+ String computationId = "computationId";
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.empty());
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertFalse(computationState.isPresent());
+ }
+
+ @Test
+ public void testGet_usesUserTransformToStateFamilyNameIfNotEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, userTransformToStateFamilyName,
ImmutableMap.of("stateName1", "stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void
testGet_defaultsToStateNameMapWhenUserTransformToStateFamilyNameEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, ImmutableMap.of(), ImmutableMap.of("stateName1",
"stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+
.isEqualTo(computationStateCache.getGlobalUsernameToStateFamilyNameMap());
+ }
+
+ @Test
+ public void testGet_buildsStateNameMap() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> stateNameMap = ImmutableMap.of("stateName1",
"stateName");
+ ComputationConfig computationConfig = ComputationConfig.create(mapTask,
null, stateNameMap);
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ assertThat(computationStateCache.getGlobalUsernameToStateFamilyNameMap())
+ .containsExactlyEntriesIn(stateNameMap);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.getIfPresent(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateNotCached() {
+ Optional<ComputationState> computationState =
+ computationStateCache.getIfPresent("computationId");
+ assertFalse(computationState.isPresent());
+ verifyNoInteractions(configFetcher);
+ }
+
+ @Test
+ public void testGetAllComputations() {
Review Comment:
nit: allpresentcomputations
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCachedOrFetchable() {
+ String computationId = "computationId";
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.empty());
Review Comment:
add similar test where fetchConfig throws an exception
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
Review Comment:
how about combining the above and this one by just verifying on both of the
get calls above?
If you feel strongly about them being separate I think this one should come
first since it's doing less
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCachedOrFetchable() {
+ String computationId = "computationId";
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.empty());
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertFalse(computationState.isPresent());
Review Comment:
call get again and verify the fetch occurs
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCachedOrFetchable() {
+ String computationId = "computationId";
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.empty());
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertFalse(computationState.isPresent());
+ }
+
+ @Test
+ public void testGet_usesUserTransformToStateFamilyNameIfNotEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, userTransformToStateFamilyName,
ImmutableMap.of("stateName1", "stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void
testGet_defaultsToStateNameMapWhenUserTransformToStateFamilyNameEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, ImmutableMap.of(), ImmutableMap.of("stateName1",
"stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+
.isEqualTo(computationStateCache.getGlobalUsernameToStateFamilyNameMap());
+ }
+
+ @Test
+ public void testGet_buildsStateNameMap() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> stateNameMap = ImmutableMap.of("stateName1",
"stateName");
+ ComputationConfig computationConfig = ComputationConfig.create(mapTask,
null, stateNameMap);
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ assertThat(computationStateCache.getGlobalUsernameToStateFamilyNameMap())
+ .containsExactlyEntriesIn(stateNameMap);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.getIfPresent(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateNotCached() {
+ Optional<ComputationState> computationState =
+ computationStateCache.getIfPresent("computationId");
+ assertFalse(computationState.isPresent());
+ verifyNoInteractions(configFetcher);
+ }
+
+ @Test
+ public void testGetAllComputations() {
+ String computationId1 = "computationId1";
+ String computationId2 = "computationId2";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId1))).thenReturn(Optional.of(computationConfig));
+
when(configFetcher.fetchConfig(eq(computationId2))).thenReturn(Optional.of(computationConfig));
+
+ computationStateCache.get(computationId1);
+ computationStateCache.get(computationId2);
+ Set<String> computationIds = ImmutableSet.of(computationId1,
computationId2);
+ computationStateCache
+ .getAllPresentComputations()
+ .forEach(
+ computationState -> {
+
assertThat(computationIds).contains(computationState.getComputationId());
+ assertThat(computationState.getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ });
+ }
+
+ @Test
+ public void testTotalCurrentActiveGetWorkBudget() {
Review Comment:
woul;d be better test with multiple computationstate that it has to
aggregate over
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
+ boolean waitForInitialConfig,
+ long globalConfigRefreshPeriod,
+ Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ return StreamingEngineComputationConfigFetcher.forTesting(
+ !waitForInitialConfig,
+ globalConfigRefreshPeriod,
+ mockDataflowServiceClient,
+ ignored -> Executors.newSingleThreadScheduledExecutor(),
Review Comment:
just move into forTesting method?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ComputationStateCacheTest {
+ private final BoundedQueueExecutor workExecutor =
mock(BoundedQueueExecutor.class);
+ private final WindmillStateCache.ForComputation stateCache =
+ mock(WindmillStateCache.ForComputation.class);
+ private final ComputationConfig.Fetcher configFetcher =
mock(ComputationConfig.Fetcher.class);
+ private ComputationStateCache computationStateCache;
+
+ private static Work createWork(long workToken, long cacheToken) {
+
+ return Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.copyFromUtf8(""))
+ .setShardingKey(1)
+ .setWorkToken(workToken)
+ .setCacheToken(cacheToken)
+ .build(),
+ Instant::now,
+ Collections.emptyList(),
+ unused -> {});
+ }
+
+ @Before
+ public void setUp() {
+ computationStateCache =
+ ComputationStateCache.create(
+ configFetcher, workExecutor, ignored -> stateCache,
IdGenerators.decrementingLongs());
+ }
+
+ @Test
+ public void testGet_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGet_computationStateNotCachedOrFetchable() {
+ String computationId = "computationId";
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.empty());
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertFalse(computationState.isPresent());
+ }
+
+ @Test
+ public void testGet_usesUserTransformToStateFamilyNameIfNotEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, userTransformToStateFamilyName,
ImmutableMap.of("stateName1", "stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void
testGet_defaultsToStateNameMapWhenUserTransformToStateFamilyNameEmpty() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(
+ mapTask, ImmutableMap.of(), ImmutableMap.of("stateName1",
"stateName"));
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ Optional<ComputationState> computationState =
computationStateCache.get(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+
.isEqualTo(computationStateCache.getGlobalUsernameToStateFamilyNameMap());
+ }
+
+ @Test
+ public void testGet_buildsStateNameMap() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> stateNameMap = ImmutableMap.of("stateName1",
"stateName");
+ ComputationConfig computationConfig = ComputationConfig.create(mapTask,
null, stateNameMap);
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ assertThat(computationStateCache.getGlobalUsernameToStateFamilyNameMap())
+ .containsExactlyEntriesIn(stateNameMap);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateCached() {
+ String computationId = "computationId";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId))).thenReturn(Optional.of(computationConfig));
+ computationStateCache.get(computationId);
+ Optional<ComputationState> computationState =
computationStateCache.getIfPresent(computationId);
+ assertTrue(computationState.isPresent());
+
assertThat(computationState.get().getComputationId()).isEqualTo(computationId);
+ assertThat(computationState.get().getMapTask()).isEqualTo(mapTask);
+ assertThat(computationState.get().getTransformUserNameToStateFamily())
+ .isEqualTo(userTransformToStateFamilyName);
+ }
+
+ @Test
+ public void testGetIfPresent_computationStateNotCached() {
+ Optional<ComputationState> computationState =
+ computationStateCache.getIfPresent("computationId");
+ assertFalse(computationState.isPresent());
+ verifyNoInteractions(configFetcher);
+ }
+
+ @Test
+ public void testGetAllComputations() {
+ String computationId1 = "computationId1";
+ String computationId2 = "computationId2";
+ MapTask mapTask = new
MapTask().setStageName("stageName").setSystemName("systemName");
+ Map<String, String> userTransformToStateFamilyName =
+ ImmutableMap.of("userTransformName", "stateFamilyName");
+ ComputationConfig computationConfig =
+ ComputationConfig.create(mapTask, userTransformToStateFamilyName,
ImmutableMap.of());
+
when(configFetcher.fetchConfig(eq(computationId1))).thenReturn(Optional.of(computationConfig));
+
when(configFetcher.fetchConfig(eq(computationId2))).thenReturn(Optional.of(computationConfig));
+
+ computationStateCache.get(computationId1);
+ computationStateCache.get(computationId2);
+ Set<String> computationIds = ImmutableSet.of(computationId1,
computationId2);
+ computationStateCache
+ .getAllPresentComputations()
+ .forEach(
+ computationState -> {
+
assertThat(computationIds).contains(computationState.getComputationId());
Review Comment:
As is this would pass if getAllPresentComputations returned nothing.
verify that it contains both computation ids
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toMap;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingApplianceComputationConfigFetcherTest {
+ private final WindmillServerStub mockWindmillServer =
mock(WindmillServerStub.class);
+
+ @Test
+ public void testGetComputationConfig() throws IOException {
+ List<Windmill.GetConfigResponse.NameMapEntry> nameMapEntries =
+ ImmutableList.of(
+ Windmill.GetConfigResponse.NameMapEntry.newBuilder()
+ .setUserName("userName1")
+ .setSystemName("systemName1")
+ .build(),
+ Windmill.GetConfigResponse.NameMapEntry.newBuilder()
+ .setUserName("userName2")
+ .setSystemName("userName2")
+ .build());
+ String serializedMapTask =
+ Transport.getJsonFactory()
+ .toString(
+ new MapTask()
+ .setSystemName("systemName")
+ .setStageName("stageName")
+ .setInstructions(ImmutableList.of()));
+ Windmill.GetConfigResponse getConfigResponse =
+ Windmill.GetConfigResponse.newBuilder()
+ .addAllNameMap(nameMapEntries)
+ .addComputationConfigMap(
+
Windmill.GetConfigResponse.ComputationConfigMapEntry.newBuilder()
+ .setComputationId("systemName")
+ .setComputationConfig(
+ Windmill.ComputationConfig.newBuilder()
+ .addTransformUserNameToStateFamily(
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ .newBuilder()
+ .setStateFamily("stateFamilyName")
+ .setTransformUserName("transformUserName")
+ .build())
+ .build())
+ .build())
+ .addCloudWorks(serializedMapTask)
+ .build();
+ ComputationConfig expectedConfig =
+ ComputationConfig.create(
+ Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class),
+ getConfigResponse.getComputationConfigMapList().stream()
+
.map(Windmill.GetConfigResponse.ComputationConfigMapEntry::getComputationConfig)
+ .flatMap(
+ computationConfig ->
+
computationConfig.getTransformUserNameToStateFamilyList().stream())
+ .collect(
+ toMap(
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ ::getTransformUserName,
+
Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
+ ::getStateFamily)),
+ nameMapEntries.stream()
+ .collect(
+ toMap(
+ Windmill.GetConfigResponse.NameMapEntry::getUserName,
+
Windmill.GetConfigResponse.NameMapEntry::getSystemName)));
+ StreamingApplianceComputationConfigFetcher configLoader =
+ createStreamingApplianceConfigLoader();
+ when(mockWindmillServer.getConfig(any())).thenReturn(getConfigResponse);
+ Optional<ComputationConfig> config =
configLoader.fetchConfig("someComputationId");
+ assertTrue(config.isPresent());
+ assertThat(config.get()).isEqualTo(expectedConfig);
+ }
+
+ @Test
+ public void testGetComputationConfig_whenNoConfigReturned() {
+ StreamingApplianceComputationConfigFetcher configLoader =
+ createStreamingApplianceConfigLoader();
+ when(mockWindmillServer.getConfig(any())).thenReturn(null);
+ Optional<ComputationConfig> configResponse =
configLoader.fetchConfig("someComputationId");
+ assertFalse(configResponse.isPresent());
+ }
+
Review Comment:
add test that if the windmillfetcher throws exception that it is propagated
to configLoader.fetchConfig
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
+ boolean waitForInitialConfig,
+ long globalConfigRefreshPeriod,
+ Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ return StreamingEngineComputationConfigFetcher.forTesting(
+ !waitForInitialConfig,
+ globalConfigRefreshPeriod,
+ mockDataflowServiceClient,
+ ignored -> Executors.newSingleThreadScheduledExecutor(),
+ onPipelineConfig);
+ }
+
+ @After
+ public void cleanUp() {
+ streamingEngineConfigFetcher.stop();
+ }
+
+ @Test
+ public void testStart_requiresInitialConfig() throws IOException,
InterruptedException {
+ WorkItem initialConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(initialConfig));
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ 0,
+ config -> {
+ try {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ waitForInitialConfig.countDown();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
+ .containsExactly(
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ }
+
+ @Test
+ public void testStart_startsPeriodicConfigRequests() throws IOException,
InterruptedException {
+ WorkItem firstConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ WorkItem secondConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(15L));
+ WorkItem thirdConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
+ CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(firstConfig))
+ .thenReturn(Optional.of(secondConfig))
+ .thenReturn(Optional.of(thirdConfig));
Review Comment:
will there be a flaky error if a 4th fetch comes in before the background
thread stops? shoudl some sort of default response be setup?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
Review Comment:
nit: s/Loader/Fetcher/
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
+ boolean waitForInitialConfig,
+ long globalConfigRefreshPeriod,
+ Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ return StreamingEngineComputationConfigFetcher.forTesting(
+ !waitForInitialConfig,
+ globalConfigRefreshPeriod,
+ mockDataflowServiceClient,
+ ignored -> Executors.newSingleThreadScheduledExecutor(),
+ onPipelineConfig);
+ }
+
+ @After
+ public void cleanUp() {
+ streamingEngineConfigFetcher.stop();
+ }
+
+ @Test
+ public void testStart_requiresInitialConfig() throws IOException,
InterruptedException {
+ WorkItem initialConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(initialConfig));
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ 0,
+ config -> {
+ try {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ waitForInitialConfig.countDown();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
+ .containsExactly(
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ }
+
+ @Test
+ public void testStart_startsPeriodicConfigRequests() throws IOException,
InterruptedException {
+ WorkItem firstConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ WorkItem secondConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(15L));
+ WorkItem thirdConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
+ CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(firstConfig))
+ .thenReturn(Optional.of(secondConfig))
Review Comment:
test that if an exception is thrown in the middle that later fetches still
work
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingEngineComputationConfigFetcherTest {
+ private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class);
+ private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
+
+ private StreamingEngineComputationConfigFetcher createConfigLoader(
+ boolean waitForInitialConfig,
+ long globalConfigRefreshPeriod,
+ Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ return StreamingEngineComputationConfigFetcher.forTesting(
+ !waitForInitialConfig,
+ globalConfigRefreshPeriod,
+ mockDataflowServiceClient,
+ ignored -> Executors.newSingleThreadScheduledExecutor(),
+ onPipelineConfig);
+ }
+
+ @After
+ public void cleanUp() {
+ streamingEngineConfigFetcher.stop();
+ }
+
+ @Test
+ public void testStart_requiresInitialConfig() throws IOException,
InterruptedException {
+ WorkItem initialConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(initialConfig));
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ 0,
+ config -> {
+ try {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ waitForInitialConfig.countDown();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
+ .containsExactly(
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ }
+
+ @Test
+ public void testStart_startsPeriodicConfigRequests() throws IOException,
InterruptedException {
+ WorkItem firstConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
+ WorkItem secondConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(15L));
+ WorkItem thirdConfig =
+ new WorkItem()
+ .setJobId("job")
+ .setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
+ CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
+ Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(firstConfig))
+ .thenReturn(Optional.of(secondConfig))
+ .thenReturn(Optional.of(thirdConfig));
+
+ streamingEngineConfigFetcher =
+ createConfigLoader(
+ /* waitForInitialConfig= */ true,
+ Duration.millis(100).getMillis(),
+ config -> {
+ receivedPipelineConfig.add(config);
+ numExpectedRefreshes.countDown();
+ });
+
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ numExpectedRefreshes.await();
+ asyncStartConfigLoader.join();
+ assertThat(receivedPipelineConfig)
+ .containsExactly(
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build(),
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build(),
+ StreamingEnginePipelineConfig.builder()
+ .setMaxWorkItemCommitBytes(
+
thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ }
+
+ @Test
+ public void testGetComputationConfig() throws IOException {
+ streamingEngineConfigFetcher =
+ createConfigLoader(/* waitForInitialConfig= */ false, 0, ignored ->
{});
+ String computationId = "computationId";
+ String stageName = "stageName";
+ String systemName = "systemName";
+ StreamingComputationConfig pipelineConfig =
+ new StreamingComputationConfig()
+ .setComputationId(computationId)
+ .setStageName(stageName)
+ .setSystemName(systemName)
+ .setInstructions(ImmutableList.of());
+
+ WorkItem workItem =
+ new WorkItem()
+ .setStreamingConfigTask(
+ new StreamingConfigTask()
+
.setStreamingComputationConfigs(ImmutableList.of(pipelineConfig)));
+
+ when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString()))
+ .thenReturn(Optional.of(workItem));
+ Optional<ComputationConfig> actualPipelineConfig =
+ streamingEngineConfigFetcher.fetchConfig(computationId);
+
+ assertTrue(actualPipelineConfig.isPresent());
+ assertThat(actualPipelineConfig.get())
+ .isEqualTo(
+ ComputationConfig.create(
+
StreamingEngineComputationConfigFetcher.createMapTask(pipelineConfig),
+ ImmutableMap.of(),
+ ImmutableMap.of()));
+ }
+
+ @Test
+ public void testGetComputationConfig_noComputationPresent() throws
IOException {
Review Comment:
test the dataflowserviceclient returnign an exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]