sjvanrossum commented on PR #36099:
URL: https://github.com/apache/beam/pull/36099#issuecomment-3306131463

   Here's a sample for the `FileSystemConfigProvider`:
   ```
   package org.apache.beam.sdk.extensions.kafka.config;
   
   import java.io.IOException;
   import java.nio.channels.ReadableByteChannel;
   import java.nio.channels.WritableByteChannel;
   import java.nio.file.Files;
   import java.nio.file.Path;
   import java.nio.file.Paths;
   import java.nio.file.StandardOpenOption;
   import java.util.Arrays;
   import java.util.Collections;
   import java.util.HashMap;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
   import java.util.Set;
   import java.util.concurrent.TimeUnit;
   import java.util.stream.Collectors;
   import org.apache.beam.sdk.io.FileSystems;
   import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
   import org.apache.beam.sdk.io.fs.MatchResult;
   import org.apache.beam.sdk.io.fs.ResourceId;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
   import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
   import org.apache.kafka.common.config.AbstractConfig;
   import org.apache.kafka.common.config.ConfigData;
   import org.apache.kafka.common.config.ConfigDef;
   import org.apache.kafka.common.config.provider.ConfigProvider;
   
   public final class FileSystemConfigProvider implements ConfigProvider {
     static class Config extends AbstractConfig {
       static final String TTL_MS_CONFIG = "ttl.ms";
   
       static final ConfigDef CONFIG =
           new ConfigDef()
               .define(
                   TTL_MS_CONFIG,
                   ConfigDef.Type.LONG,
                   0L,
                   ConfigDef.Range.atLeast(0L),
                   ConfigDef.Importance.LOW,
                   "TTL in milliseconds for temporary files.");
   
       Config(Map<?, ?> props) {
         super(CONFIG, props);
       }
     }
   
     private static final LoadingCache<List<? extends Object>, 
LoadingCache<String, String>> CACHES =
         CacheBuilder.newBuilder()
             .build(
                 new CacheLoader<List<? extends Object>, LoadingCache<String, 
String>>() {
                   @Override
                   public LoadingCache<String, String> load(List<? extends 
Object> key)
                       throws IOException {
                     final long ttlMillis = (long) key.get(0);
                     final String scheme = (String) key.get(1);
   
                     final Path tempDirectory = 
Files.createTempDirectory("beam-");
                     tempDirectory.toFile().deleteOnExit();
   
                     return (ttlMillis > 0
                             ? CacheBuilder.newBuilder()
                                 .refreshAfterWrite(ttlMillis, 
TimeUnit.MILLISECONDS)
                             : CacheBuilder.newBuilder())
                         .build(
                             new CacheLoader<String, String>() {
                               @Override
                               public String load(String key) throws 
IOException {
                                 final MatchResult.Metadata meta =
                                     FileSystems.matchSingleFileSpec(scheme + 
":" + key);
   
                                 final Path tempFile = 
Files.createTempFile(tempDirectory, "", ".tmp");
                                 tempFile.toFile().deleteOnExit();
   
                                 try (ReadableByteChannel src = 
FileSystems.open(meta.resourceId());
                                     WritableByteChannel dst =
                                         Files.newByteChannel(
                                             tempFile,
                                             StandardOpenOption.WRITE,
                                             
StandardOpenOption.TRUNCATE_EXISTING)) {
                                   ByteStreams.copy(src, dst);
                                 }
   
                                 return tempFile.toString();
                               }
   
                               @Override
                               public Map<String, String> loadAll(Iterable<? 
extends String> keys)
                                   throws IOException {
                                 final List<String> specs =
                                     Streams.stream(keys)
                                         .map(e -> scheme + ":" + e)
                                         .collect(Collectors.toList());
                                 final List<MatchResult> matches =
                                     FileSystems.match(specs, 
EmptyMatchTreatment.ALLOW);
                                 final HashMap<String, String> result = new 
HashMap<>(specs.size());
   
                                 final Iterator<String> specIter = 
specs.iterator();
                                 final Iterator<MatchResult> matchIter = 
matches.iterator();
                                 while (specIter.hasNext() && 
matchIter.hasNext()) {
                                   final String spec = specIter.next();
                                   final MatchResult match = matchIter.next();
   
                                   for (MatchResult.Metadata meta : 
match.metadata()) {
                                     try {
                                       final Path tempFile =
                                           Files.createTempFile(tempDirectory, 
"", ".tmp");
                                       tempFile.toFile().deleteOnExit();
   
                                       try (ReadableByteChannel src =
                                               
FileSystems.open(meta.resourceId());
                                           WritableByteChannel dst =
                                               Files.newByteChannel(
                                                   tempFile,
                                                   StandardOpenOption.WRITE,
                                                   
StandardOpenOption.TRUNCATE_EXISTING)) {
                                         ByteStreams.copy(src, dst);
                                       }
   
                                       result.put(
                                           spec.substring(scheme.length() + 1), 
tempFile.toString());
                                     } catch (IOException ex) {
                                       continue;
                                     }
                                   }
                                 }
   
                                 return result;
                               }
   
                               @Override
                               public ListenableFuture<String> reload(String 
key, String oldValue)
                                   throws IOException {
                                 final ResourceId resourceId =
                                     FileSystems.matchSingleFileSpec(scheme + 
":" + key).resourceId();
   
                                 try (ReadableByteChannel src = 
FileSystems.open(resourceId);
                                     WritableByteChannel dst =
                                         Files.newByteChannel(
                                             Paths.get(oldValue),
                                             StandardOpenOption.WRITE,
                                             
StandardOpenOption.TRUNCATE_EXISTING)) {
                                   ByteStreams.copy(src, dst);
                                 }
   
                                 return Futures.immediateFuture(oldValue);
                               }
                             });
                   }
                 });
   
     private long ttlMillis = 0L;
   
     @Override
     public void configure(Map<String, ?> configs) {
       final Config cfg = new Config(configs);
   
       ttlMillis = cfg.getLong(Config.TTL_MS_CONFIG);
     }
   
     @Override
     public void close() throws IOException {}
   
     @Override
     public ConfigData get(String path) {
       return new ConfigData(Collections.singletonMap(path, path));
     }
   
     @Override
     public ConfigData get(String path, Set<String> keys) {
       try {
         return new ConfigData(CACHES.get(Arrays.asList(ttlMillis, 
path)).getAll(keys));
       } catch (Exception ex) {
         return new ConfigData(Collections.emptyMap());
       }
     }
   }
   
   ```
   Note that the cache entries reload lazily so a `ScheduledExecutorService` 
needs to be used somewhere to make sure that the entries are reloaded regularly 
since the configuration substitution only happens once. I guess it's fine to 
remove `refreshAfterWrite` and just set up a scheduled task with fixed delay to 
reload the file contents instead of doing so in the `reload` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to